From a29292652bf4a50fa00b7af67a3ed29201048e36 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 6 Feb 2020 14:47:55 -0800 Subject: [PATCH] Refactor intermediate result (de-)serialize methods --- .../executor/intermediate_result_encoder.c | 324 ++++++++++++++++++ .../executor/intermediate_results.c | 136 +++----- .../distributed/executor/multi_executor.c | 84 +---- .../partitioned_intermediate_results.c | 5 +- .../worker/worker_sql_task_protocol.c | 82 ++--- .../distributed/intermediate_results.h | 31 ++ src/include/distributed/multi_executor.h | 2 - src/include/distributed/worker_protocol.h | 3 +- 8 files changed, 445 insertions(+), 222 deletions(-) create mode 100644 src/backend/distributed/executor/intermediate_result_encoder.c diff --git a/src/backend/distributed/executor/intermediate_result_encoder.c b/src/backend/distributed/executor/intermediate_result_encoder.c new file mode 100644 index 000000000..817ca1b13 --- /dev/null +++ b/src/backend/distributed/executor/intermediate_result_encoder.c @@ -0,0 +1,324 @@ +/*------------------------------------------------------------------------- + * + * intermediate_result_encoder.c + * Functions for encoding and decoding intermediate results. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include +#include + +#include "postgres.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" +#include "pgstat.h" + +#include "catalog/pg_enum.h" +#include "commands/copy.h" +#include "distributed/commands/multi_copy.h" +#include "distributed/connection_management.h" +#include "distributed/intermediate_results.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_client_executor.h" +#include "distributed/multi_executor.h" +#include "distributed/remote_commands.h" +#include "distributed/transmit.h" +#include "distributed/transaction_identifier.h" +#include "distributed/tuplestore.h" +#include "distributed/version_compat.h" +#include "distributed/worker_protocol.h" +#include "mb/pg_wchar.h" +#include "nodes/makefuncs.h" +#include "nodes/parsenodes.h" +#include "nodes/primnodes.h" +#include "storage/fd.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/syscache.h" + + +#define ENCODER_BUFFER_SIZE (4 * 1024 * 1024) + +/* internal state of intermediate result file encoder */ +struct IntermediateResultEncoder +{ + /* + * Result of encoding is accumulated in outputBuffer, until it is flushed. + * The flush is done when the data is returned as result of + * IntermediateResultEncoderReceive() and IntermediateResultEncoderDone() + * functions. + */ + StringInfo outputBuffer; + + /* + * Used for returning the flushed result of encoding, at which time move the + * data pointer from outputBuffer to flushBuffer before reseting length of + * outputBuffer. + * + * This is kept here to avoid allocating it everytime we need to flush some data. + */ + StringInfo flushBuffer; + + IntermediateResultFormat format; + TupleDesc tupleDescriptor; + + /* used when format is *_COPY_FORMAT */ + CopyOutState copyOutState; + FmgrInfo *columnOutputFunctions; +}; + +/* forward declaration of local functions */ +static void ReadCopyFileIntoTupleStore(char *fileName, char *copyFormat, + TupleDesc tupleDescriptor, + Tuplestorestate *tupstore); +static Relation StubRelation(TupleDesc tupleDescriptor); + + +/* + * IntermediateResultEncoderCreate returns an encoder which can encode tuples of + * given tupleDesc form using the given format. Encoder possibly can add header + * data at this stage. + */ +IntermediateResultEncoder * +IntermediateResultEncoderCreate(TupleDesc tupleDesc, + IntermediateResultFormat format, + MemoryContext tupleContext) +{ + IntermediateResultEncoder *encoder = palloc0(sizeof(IntermediateResultEncoder)); + encoder->format = format; + encoder->tupleDescriptor = CreateTupleDescCopy(tupleDesc); + encoder->outputBuffer = makeStringInfo(); + encoder->flushBuffer = makeStringInfo(); + + if (format == TEXT_COPY_FORMAT || format == BINARY_COPY_FORMAT) + { + int fileEncoding = pg_get_client_encoding(); + int databaseEncoding = GetDatabaseEncoding(); + int databaseEncodingMaxLength = pg_database_encoding_max_length(); + + char *nullPrint = "\\N"; + int nullPrintLen = strlen(nullPrint); + char *nullPrintClient = pg_server_to_any(nullPrint, nullPrintLen, fileEncoding); + + CopyOutState copyOutState = palloc0(sizeof(CopyOutStateData)); + copyOutState->delim = "\t"; + copyOutState->null_print = nullPrint; + copyOutState->null_print_client = nullPrintClient; + copyOutState->binary = (format == BINARY_COPY_FORMAT); + copyOutState->fe_msgbuf = encoder->outputBuffer; + copyOutState->rowcontext = tupleContext; + + if (PG_ENCODING_IS_CLIENT_ONLY(fileEncoding)) + { + ereport(ERROR, (errmsg("cannot repartition into encoding caller " + "cannot receive"))); + } + + /* set up transcoding information and default text output characters */ + copyOutState->need_transcoding = (fileEncoding != databaseEncoding) || + (databaseEncodingMaxLength > 1); + + encoder->copyOutState = copyOutState; + encoder->columnOutputFunctions = + ColumnOutputFunctions(tupleDesc, copyOutState->binary); + + if (copyOutState->binary) + { + AppendCopyBinaryHeaders(copyOutState); + } + } + + return encoder; +} + + +/* + * IntermediateResultEncoderReceive encodes the next row with the given encoder. + */ +StringInfo +IntermediateResultEncoderReceive(IntermediateResultEncoder *encoder, + Datum *values, bool *nulls) +{ + if (encoder->format == TEXT_COPY_FORMAT || + encoder->format == BINARY_COPY_FORMAT) + { + AppendCopyRowData(values, nulls, encoder->tupleDescriptor, + encoder->copyOutState, encoder->columnOutputFunctions, NULL); + } + + if (encoder->outputBuffer->len > ENCODER_BUFFER_SIZE) + { + encoder->flushBuffer->data = encoder->outputBuffer->data; + encoder->flushBuffer->len = encoder->outputBuffer->len; + + encoder->outputBuffer->len = 0; + + return encoder->flushBuffer; + } + + return NULL; +} + + +/* + * IntermediateResultEncoderDone tells the encoder that there is no more work + * to do. Encoder possibly can add footer data at this stage. + */ +StringInfo +IntermediateResultEncoderDone(IntermediateResultEncoder *encoder) +{ + if (encoder->format == TEXT_COPY_FORMAT || + encoder->format == BINARY_COPY_FORMAT) + { + CopyOutState copyOutState = encoder->copyOutState; + if (copyOutState->binary) + { + AppendCopyBinaryFooters(copyOutState); + } + } + + if (encoder->outputBuffer->len > 0) + { + encoder->flushBuffer->data = encoder->outputBuffer->data; + encoder->flushBuffer->len = encoder->outputBuffer->len; + + encoder->outputBuffer->len = 0; + + return encoder->flushBuffer; + } + + return NULL; +} + + +/* + * IntermediateResultEncoderDestroy cleans up resources used by the encoder. + */ +void +IntermediateResultEncoderDestroy(IntermediateResultEncoder *encoder) +{ + if (encoder->copyOutState != NULL) + { + pfree(encoder->copyOutState); + } + + if (encoder->columnOutputFunctions != NULL) + { + pfree(encoder->columnOutputFunctions); + } +} + + +/* + * ReadFileIntoTupleStore parses the records in the given file using the + * given format and puts the rows in the given tuple store. + */ +void +ReadFileIntoTupleStore(char *fileName, IntermediateResultFormat format, + TupleDesc tupleDescriptor, Tuplestorestate *tupstore) +{ + if (format == TEXT_COPY_FORMAT || format == BINARY_COPY_FORMAT) + { + char *copyFormat = (format == TEXT_COPY_FORMAT) ? "text" : "binary"; + ReadCopyFileIntoTupleStore(fileName, copyFormat, tupleDescriptor, tupstore); + } +} + + +/* + * ReadCopyFileIntoTupleStore parses the records in a COPY-formatted file + * according to the given tuple descriptor and stores the records in a tuple + * store. + */ +static void +ReadCopyFileIntoTupleStore(char *fileName, char *copyFormat, + TupleDesc tupleDescriptor, + Tuplestorestate *tupstore) +{ + /* + * Trick BeginCopyFrom into using our tuple descriptor by pretending it belongs + * to a relation. + */ + Relation stubRelation = StubRelation(tupleDescriptor); + + EState *executorState = CreateExecutorState(); + MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState); + + int columnCount = tupleDescriptor->natts; + Datum *columnValues = palloc0(columnCount * sizeof(Datum)); + bool *columnNulls = palloc0(columnCount * sizeof(bool)); + + List *copyOptions = NIL; + + int location = -1; /* "unknown" token location */ + DefElem *copyOption = makeDefElem("format", (Node *) makeString(copyFormat), + location); + copyOptions = lappend(copyOptions, copyOption); + + CopyState copyState = BeginCopyFrom(NULL, stubRelation, fileName, false, NULL, + NULL, copyOptions); + + while (true) + { + ResetPerTupleExprContext(executorState); + MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); + + bool nextRowFound = NextCopyFromCompat(copyState, executorExpressionContext, + columnValues, columnNulls); + if (!nextRowFound) + { + MemoryContextSwitchTo(oldContext); + break; + } + + tuplestore_putvalues(tupstore, tupleDescriptor, columnValues, columnNulls); + MemoryContextSwitchTo(oldContext); + } + + EndCopyFrom(copyState); + pfree(columnValues); + pfree(columnNulls); +} + + +/* + * StubRelation creates a stub Relation from the given tuple descriptor. + * To be able to use copy.c, we need a Relation descriptor. As there is no + * relation corresponding to the data loaded from workers, we need to fake one. + * We just need the bare minimal set of fields accessed by BeginCopyFrom(). + */ +static Relation +StubRelation(TupleDesc tupleDescriptor) +{ + Relation stubRelation = palloc0(sizeof(RelationData)); + stubRelation->rd_att = tupleDescriptor; + stubRelation->rd_rel = palloc0(sizeof(FormData_pg_class)); + stubRelation->rd_rel->relkind = RELKIND_RELATION; + + return stubRelation; +} + + +/* + * ResultFileFormatForTupleDesc returns the most suitable encoding format for + * the given tuple descriptor. + */ +IntermediateResultFormat +ResultFileFormatForTupleDesc(TupleDesc tupleDesc) +{ + if (CanUseBinaryCopyFormat(tupleDesc)) + { + return BINARY_COPY_FORMAT; + } + else + { + return TEXT_COPY_FORMAT; + } +} diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 2cc875d5d..c0f755636 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -54,9 +54,6 @@ typedef struct RemoteFileDestReceiver const char *resultId; - /* descriptor of the tuples that are sent to the worker */ - TupleDesc tupleDescriptor; - /* EState for per-tuple memory allocation */ EState *executorState; @@ -72,8 +69,7 @@ typedef struct RemoteFileDestReceiver FileCompat fileCompat; /* state on how to copy out data types */ - CopyOutState copyOutState; - FmgrInfo *columnOutputFunctions; + IntermediateResultEncoder *encoder; /* number of tuples sent */ uint64 tuplesSent; @@ -93,13 +89,15 @@ static void RemoteFileDestReceiverDestroy(DestReceiver *destReceiver); static char * IntermediateResultsDirectory(void); static void ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, - char *copyFormat, + IntermediateResultFormat format, Datum *resultIdArray, int resultCount); static uint64 FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId); static CopyStatus CopyDataFromConnection(MultiConnection *connection, FileCompat *fileCompat, uint64 *bytesReceived); +static IntermediateResultFormat ParseIntermediateResultFormatLabel(char *formatString); + /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(read_intermediate_result); @@ -235,26 +233,13 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, const char *resultId = resultDest->resultId; - const char *delimiterCharacter = "\t"; - const char *nullPrintCharacter = "\\N"; - List *initialNodeList = resultDest->initialNodeList; List *connectionList = NIL; + MemoryContext tupleContext = GetPerTupleMemoryContext(resultDest->executorState); + IntermediateResultFormat format = ResultFileFormatForTupleDesc(inputTupleDescriptor); - resultDest->tupleDescriptor = inputTupleDescriptor; - - /* define how tuples will be serialised */ - CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); - copyOutState->delim = (char *) delimiterCharacter; - copyOutState->null_print = (char *) nullPrintCharacter; - copyOutState->null_print_client = (char *) nullPrintCharacter; - copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor); - copyOutState->fe_msgbuf = makeStringInfo(); - copyOutState->rowcontext = GetPerTupleMemoryContext(resultDest->executorState); - resultDest->copyOutState = copyOutState; - - resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, - copyOutState->binary); + resultDest->encoder = IntermediateResultEncoderCreate(inputTupleDescriptor, + format, tupleContext); if (resultDest->writeLocalFile) { @@ -322,19 +307,6 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, PQclear(result); } - if (copyOutState->binary) - { - /* send headers when using binary encoding */ - resetStringInfo(copyOutState->fe_msgbuf); - AppendCopyBinaryHeaders(copyOutState); - BroadcastCopyData(copyOutState->fe_msgbuf, connectionList); - - if (resultDest->writeLocalFile) - { - WriteToLocalFile(copyOutState->fe_msgbuf, &resultDest->fileCompat); - } - } - resultDest->connectionList = connectionList; } @@ -365,13 +337,7 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest; - TupleDesc tupleDescriptor = resultDest->tupleDescriptor; - List *connectionList = resultDest->connectionList; - CopyOutState copyOutState = resultDest->copyOutState; - FmgrInfo *columnOutputFunctions = resultDest->columnOutputFunctions; - - StringInfo copyData = copyOutState->fe_msgbuf; EState *executorState = resultDest->executorState; MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); @@ -382,19 +348,18 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) Datum *columnValues = slot->tts_values; bool *columnNulls = slot->tts_isnull; - resetStringInfo(copyData); - - /* construct row in COPY format */ - AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, - copyOutState, columnOutputFunctions, NULL); - - /* send row to nodes */ - BroadcastCopyData(copyData, connectionList); - - /* write to local file (if applicable) */ - if (resultDest->writeLocalFile) + StringInfo bufferToFlush = + IntermediateResultEncoderReceive(resultDest->encoder, columnValues, columnNulls); + if (bufferToFlush != NULL) { - WriteToLocalFile(copyOutState->fe_msgbuf, &resultDest->fileCompat); + /* send row to nodes */ + BroadcastCopyData(bufferToFlush, connectionList); + + /* write to local file (if applicable) */ + if (resultDest->writeLocalFile) + { + WriteToLocalFile(bufferToFlush, &resultDest->fileCompat); + } } MemoryContextSwitchTo(oldContext); @@ -433,20 +398,18 @@ static void RemoteFileDestReceiverShutdown(DestReceiver *destReceiver) { RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver; - List *connectionList = resultDest->connectionList; - CopyOutState copyOutState = resultDest->copyOutState; - if (copyOutState->binary) + StringInfo bufferToFlush = IntermediateResultEncoderDone(resultDest->encoder); + if (bufferToFlush != NULL) { - /* send footers when using binary encoding */ - resetStringInfo(copyOutState->fe_msgbuf); - AppendCopyBinaryFooters(copyOutState); - BroadcastCopyData(copyOutState->fe_msgbuf, connectionList); + /* send row to nodes */ + BroadcastCopyData(bufferToFlush, connectionList); + /* write to local file (if applicable) */ if (resultDest->writeLocalFile) { - WriteToLocalFile(copyOutState->fe_msgbuf, &resultDest->fileCompat); + WriteToLocalFile(bufferToFlush, &resultDest->fileCompat); } } @@ -497,15 +460,7 @@ RemoteFileDestReceiverDestroy(DestReceiver *destReceiver) { RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver; - if (resultDest->copyOutState) - { - pfree(resultDest->copyOutState); - } - - if (resultDest->columnOutputFunctions) - { - pfree(resultDest->columnOutputFunctions); - } + IntermediateResultEncoderDestroy(resultDest->encoder); pfree(resultDest); } @@ -708,13 +663,14 @@ Datum read_intermediate_result(PG_FUNCTION_ARGS) { Datum resultId = PG_GETARG_DATUM(0); - Datum copyFormatOidDatum = PG_GETARG_DATUM(1); - Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); - char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); + Datum formatOidDatum = PG_GETARG_DATUM(1); + Datum formatLabelDatum = DirectFunctionCall1(enum_out, formatOidDatum); + char *formatLabel = DatumGetCString(formatLabelDatum); + IntermediateResultFormat format = ParseIntermediateResultFormatLabel(formatLabel); CheckCitusVersion(ERROR); - ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel, &resultId, 1); + ReadIntermediateResultsIntoFuncOutput(fcinfo, format, &resultId, 1); PG_RETURN_DATUM(0); } @@ -733,8 +689,9 @@ read_intermediate_result_array(PG_FUNCTION_ARGS) ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0); Datum copyFormatOidDatum = PG_GETARG_DATUM(1); - Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); - char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); + Datum formatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); + char *formatLabel = DatumGetCString(formatLabelDatum); + IntermediateResultFormat format = ParseIntermediateResultFormatLabel(formatLabel); CheckCitusVersion(ERROR); @@ -742,7 +699,7 @@ read_intermediate_result_array(PG_FUNCTION_ARGS) resultIdObject)); Datum *resultIdArray = DeconstructArrayObject(resultIdObject); - ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel, + ReadIntermediateResultsIntoFuncOutput(fcinfo, format, resultIdArray, resultCount); PG_RETURN_DATUM(0); @@ -755,7 +712,8 @@ read_intermediate_result_array(PG_FUNCTION_ARGS) * don't exist. */ static void -ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, +ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, + IntermediateResultFormat format, Datum *resultIdArray, int resultCount) { TupleDesc tupleDescriptor = NULL; @@ -774,7 +732,7 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, errmsg("result \"%s\" does not exist", resultId))); } - ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, tupleStore); + ReadFileIntoTupleStore(resultFileName, format, tupleDescriptor, tupleStore); } tuplestore_donestoring(tupleStore); @@ -1003,3 +961,21 @@ CopyDataFromConnection(MultiConnection *connection, FileCompat *fileCompat, return CLIENT_COPY_FAILED; } } + + +static IntermediateResultFormat +ParseIntermediateResultFormatLabel(char *formatString) +{ + if (strncmp(formatString, "text", NAMEDATALEN) == 0 || + strncmp(formatString, "csv", NAMEDATALEN) == 0) + { + return TEXT_COPY_FORMAT; + } + else if (strncmp(formatString, "binary", NAMEDATALEN) == 0) + { + return BINARY_COPY_FORMAT; + } + + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Invalid intermediate result format: %s", formatString))); +} diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 82d177054..a9efec4c9 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -11,6 +11,7 @@ #include "postgres.h" #include "miscadmin.h" +#include "pgstat.h" #include "access/xact.h" #include "catalog/dependency.h" @@ -22,6 +23,7 @@ #include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" #include "distributed/listutils.h" +#include "distributed/intermediate_results.h" #include "distributed/master_protocol.h" #include "distributed/multi_executor.h" #include "distributed/multi_master_planner.h" @@ -31,6 +33,7 @@ #include "distributed/multi_server_executor.h" #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" +#include "distributed/transmit.h" #include "distributed/worker_shard_visibility.h" #include "distributed/worker_protocol.h" #include "executor/execdebug.h" @@ -68,7 +71,6 @@ int ExecutorLevel = 0; /* local function forward declarations */ -static Relation StubRelation(TupleDesc tupleDescriptor); static bool AlterTableConstraintCheck(QueryDesc *queryDesc); static bool IsLocalReferenceTableJoinPlan(PlannedStmt *plan); static List * FindCitusCustomScanStates(PlanState *planState); @@ -388,7 +390,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) List *workerTaskList = workerJob->taskList; bool randomAccess = true; bool interTransactions = false; - char *copyFormat = "text"; + IntermediateResultFormat format = TEXT_COPY_FORMAT; TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(citusScanState); @@ -398,7 +400,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) if (BinaryMasterCopyFormat) { - copyFormat = "binary"; + format = BINARY_COPY_FORMAT; } Task *workerTask = NULL; @@ -407,7 +409,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) StringInfo jobDirectoryName = MasterJobDirectoryName(workerTask->jobId); StringInfo taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId); - ReadFileIntoTupleStore(taskFilename->data, copyFormat, tupleDescriptor, + ReadFileIntoTupleStore(taskFilename->data, format, tupleDescriptor, citusScanState->tuplestorestate); } @@ -415,62 +417,6 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) } -/* - * ReadFileIntoTupleStore parses the records in a COPY-formatted file according - * according to the given tuple descriptor and stores the records in a tuple - * store. - */ -void -ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescriptor, - Tuplestorestate *tupstore) -{ - /* - * Trick BeginCopyFrom into using our tuple descriptor by pretending it belongs - * to a relation. - */ - Relation stubRelation = StubRelation(tupleDescriptor); - - EState *executorState = CreateExecutorState(); - MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); - ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState); - - int columnCount = tupleDescriptor->natts; - Datum *columnValues = palloc0(columnCount * sizeof(Datum)); - bool *columnNulls = palloc0(columnCount * sizeof(bool)); - - List *copyOptions = NIL; - - int location = -1; /* "unknown" token location */ - DefElem *copyOption = makeDefElem("format", (Node *) makeString(copyFormat), - location); - copyOptions = lappend(copyOptions, copyOption); - - CopyState copyState = BeginCopyFrom(NULL, stubRelation, fileName, false, NULL, - NULL, copyOptions); - - while (true) - { - ResetPerTupleExprContext(executorState); - MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); - - bool nextRowFound = NextCopyFromCompat(copyState, executorExpressionContext, - columnValues, columnNulls); - if (!nextRowFound) - { - MemoryContextSwitchTo(oldContext); - break; - } - - tuplestore_putvalues(tupstore, tupleDescriptor, columnValues, columnNulls); - MemoryContextSwitchTo(oldContext); - } - - EndCopyFrom(copyState); - pfree(columnValues); - pfree(columnNulls); -} - - /* * SortTupleStore gets a CitusScanState and sorts the tuplestore by all the * entries in the target entry list, starting from the first one and @@ -568,24 +514,6 @@ SortTupleStore(CitusScanState *scanState) } -/* - * StubRelation creates a stub Relation from the given tuple descriptor. - * To be able to use copy.c, we need a Relation descriptor. As there is no - * relation corresponding to the data loaded from workers, we need to fake one. - * We just need the bare minimal set of fields accessed by BeginCopyFrom(). - */ -static Relation -StubRelation(TupleDesc tupleDescriptor) -{ - Relation stubRelation = palloc0(sizeof(RelationData)); - stubRelation->rd_att = tupleDescriptor; - stubRelation->rd_rel = palloc0(sizeof(FormData_pg_class)); - stubRelation->rd_rel->relkind = RELKIND_RELATION; - - return stubRelation; -} - - /* * ExecuteQueryStringIntoDestReceiver plans and executes a query and sends results * to the given DestReceiver. diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c index 1c20daddf..0183f557f 100644 --- a/src/backend/distributed/executor/partitioned_intermediate_results.c +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -460,8 +460,11 @@ PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDes partitionIndex); char *filePath = QueryResultFileName(resultId->data); + IntermediateResultFormat format = + partitionedDest->binaryCopy ? BINARY_COPY_FORMAT : TEXT_COPY_FORMAT; + partitionDest = CreateFileDestReceiver(filePath, partitionedDest->perTupleContext, - partitionedDest->binaryCopy); + format); partitionedDest->partitionDestReceivers[partitionIndex] = partitionDest; partitionDest->rStartup(partitionDest, 0, partitionedDest->tupleDescriptor); } diff --git a/src/backend/distributed/worker/worker_sql_task_protocol.c b/src/backend/distributed/worker/worker_sql_task_protocol.c index 46ef8f147..d6d987361 100644 --- a/src/backend/distributed/worker/worker_sql_task_protocol.c +++ b/src/backend/distributed/worker/worker_sql_task_protocol.c @@ -44,11 +44,10 @@ typedef struct TaskFileDestReceiver /* output file */ char *filePath; FileCompat fileCompat; - bool binaryCopyFormat; + IntermediateResultFormat format; /* state on how to copy out data types */ - CopyOutState copyOutState; - FmgrInfo *columnOutputFunctions; + IntermediateResultEncoder *encoder; /* statistics */ uint64 tuplesSent; @@ -102,11 +101,14 @@ WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat) { ParamListInfo paramListInfo = NULL; + IntermediateResultFormat format = + binaryCopyFormat ? BINARY_COPY_FORMAT : TEXT_COPY_FORMAT; + EState *estate = CreateExecutorState(); MemoryContext tupleContext = GetPerTupleMemoryContext(estate); TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) CreateFileDestReceiver(taskFilename, tupleContext, - binaryCopyFormat); + format); ExecuteQueryIntoDestReceiver(query, paramListInfo, (DestReceiver *) taskFileDest); @@ -124,7 +126,8 @@ WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat) * to a file. */ DestReceiver * -CreateFileDestReceiver(char *filePath, MemoryContext tupleContext, bool binaryCopyFormat) +CreateFileDestReceiver(char *filePath, MemoryContext tupleContext, + IntermediateResultFormat format) { TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) palloc0( sizeof(TaskFileDestReceiver)); @@ -140,7 +143,7 @@ CreateFileDestReceiver(char *filePath, MemoryContext tupleContext, bool binaryCo taskFileDest->tupleContext = tupleContext; taskFileDest->memoryContext = CurrentMemoryContext; taskFileDest->filePath = pstrdup(filePath); - taskFileDest->binaryCopyFormat = binaryCopyFormat; + taskFileDest->format = format; return (DestReceiver *) taskFileDest; } @@ -157,9 +160,6 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation, { TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest; - const char *delimiterCharacter = "\t"; - const char *nullPrintCharacter = "\\N"; - const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); const int fileMode = (S_IRUSR | S_IWUSR); @@ -169,29 +169,15 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation, taskFileDest->tupleDescriptor = inputTupleDescriptor; /* define how tuples will be serialised */ - CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); - copyOutState->delim = (char *) delimiterCharacter; - copyOutState->null_print = (char *) nullPrintCharacter; - copyOutState->null_print_client = (char *) nullPrintCharacter; - copyOutState->binary = taskFileDest->binaryCopyFormat; - copyOutState->fe_msgbuf = makeStringInfo(); - copyOutState->rowcontext = taskFileDest->tupleContext; - taskFileDest->copyOutState = copyOutState; - - taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, - copyOutState->binary); + taskFileDest->encoder = IntermediateResultEncoderCreate(inputTupleDescriptor, + taskFileDest->format, + taskFileDest->tupleContext); taskFileDest->fileCompat = FileCompatFromFileStart(FileOpenForTransmit( taskFileDest->filePath, fileFlags, fileMode)); - if (copyOutState->binary) - { - /* write headers when using binary encoding */ - AppendCopyBinaryHeaders(copyOutState); - } - MemoryContextSwitchTo(oldContext); } @@ -206,12 +192,7 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest; - TupleDesc tupleDescriptor = taskFileDest->tupleDescriptor; - - CopyOutState copyOutState = taskFileDest->copyOutState; - FmgrInfo *columnOutputFunctions = taskFileDest->columnOutputFunctions; - - StringInfo copyData = copyOutState->fe_msgbuf; + IntermediateResultEncoder *encoder = taskFileDest->encoder; MemoryContext executorTupleContext = taskFileDest->tupleContext; MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); @@ -221,14 +202,11 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) Datum *columnValues = slot->tts_values; bool *columnNulls = slot->tts_isnull; - /* construct row in COPY format */ - AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, - copyOutState, columnOutputFunctions, NULL); - - if (copyData->len > COPY_BUFFER_SIZE) + StringInfo bufferToFlush = IntermediateResultEncoderReceive(encoder, columnValues, + columnNulls); + if (bufferToFlush != NULL) { - WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); - resetStringInfo(copyData); + WriteToLocalFile(bufferToFlush, taskFileDest); } MemoryContextSwitchTo(oldContext); @@ -268,20 +246,12 @@ static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver) { TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver; - CopyOutState copyOutState = taskFileDest->copyOutState; - if (copyOutState->fe_msgbuf->len > 0) + IntermediateResultEncoder *encoder = taskFileDest->encoder; + StringInfo bufferToFlush = IntermediateResultEncoderDone(encoder); + if (bufferToFlush != NULL) { - WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); - resetStringInfo(copyOutState->fe_msgbuf); - } - - if (copyOutState->binary) - { - /* write footers when using binary encoding */ - AppendCopyBinaryFooters(copyOutState); - WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); - resetStringInfo(copyOutState->fe_msgbuf); + WriteToLocalFile(bufferToFlush, taskFileDest); } FileClose(taskFileDest->fileCompat.fd); @@ -297,15 +267,7 @@ TaskFileDestReceiverDestroy(DestReceiver *destReceiver) { TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver; - if (taskFileDest->copyOutState) - { - pfree(taskFileDest->copyOutState); - } - - if (taskFileDest->columnOutputFunctions) - { - pfree(taskFileDest->columnOutputFunctions); - } + IntermediateResultEncoderDestroy(taskFileDest->encoder); pfree(taskFileDest->filePath); pfree(taskFileDest); diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index 73b25460e..5ab586690 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -48,6 +48,24 @@ typedef struct DistributedResultFragment } DistributedResultFragment; +/* constants for intermediate result encoding formats */ +typedef enum IntermediateResultFormat +{ + TEXT_COPY_FORMAT, + BINARY_COPY_FORMAT +} IntermediateResultFormat; + + +/* + * IntermediateResultEncoder represents encoder state for intermediate result + * files. This structure is created by IntermediateResultEncoderCreate(), and + * then user should use IntermediateResultEncoderReceive() for encoding each + * row. Finally, users should call IntermediateResultEncoderDone() to finish + * the encoding. + */ +typedef struct IntermediateResultEncoder IntermediateResultEncoder; + + /* intermediate_results.c */ extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId, EState *executorState, @@ -60,6 +78,19 @@ extern int64 IntermediateResultSize(const char *resultId); extern char * QueryResultFileName(const char *resultId); extern char * CreateIntermediateResultsDirectory(void); +/* encoding intermediate result files */ +extern IntermediateResultEncoder * IntermediateResultEncoderCreate(TupleDesc tupleDesc, + IntermediateResultFormat + format, MemoryContext + tupleContext); +extern StringInfo IntermediateResultEncoderReceive(IntermediateResultEncoder *encoder, + Datum *values, bool *nulls); +extern StringInfo IntermediateResultEncoderDone(IntermediateResultEncoder *encoder); +extern void IntermediateResultEncoderDestroy(IntermediateResultEncoder *encoder); +extern void ReadFileIntoTupleStore(char *fileName, IntermediateResultFormat format, + TupleDesc tupleDescriptor, Tuplestorestate *tupstore); +extern IntermediateResultFormat ResultFileFormatForTupleDesc(TupleDesc tupleDesc); + /* distributed_intermediate_results.c */ extern List ** RedistributeTaskListResults(const char *resultIdPrefix, List *selectTaskList, diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 448158d59..17b7ed62c 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -92,8 +92,6 @@ extern bool IsCitusCustomState(PlanState *planState); extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); -extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc - tupleDescriptor, Tuplestorestate *tupstore); extern Query * ParseQueryString(const char *queryString, Oid *paramOids, int numParams); extern Query * RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString, Oid *paramOids, int numParams); diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 3e1f7c426..8c9245882 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -17,6 +17,7 @@ #include "postgres.h" #include "fmgr.h" +#include "distributed/intermediate_results.h" #include "distributed/shardinterval_utils.h" #include "lib/stringinfo.h" #include "nodes/parsenodes.h" @@ -138,7 +139,7 @@ extern CreateStmt * CreateStatement(RangeVar *relation, List *columnDefinitionLi extern CopyStmt * CopyStatement(RangeVar *relation, char *sourceFilename); extern DestReceiver * CreateFileDestReceiver(char *filePath, MemoryContext tupleContext, - bool binaryCopyFormat); + IntermediateResultFormat format); extern void FileDestReceiverStats(DestReceiver *dest, uint64 *rowsSent, uint64 *bytesSent);