Refactor intermediate result (de-)serialize methods

pull/3490/head
Hadi Moshayedi 2020-02-06 14:47:55 -08:00
parent 99c5b0add7
commit a29292652b
8 changed files with 445 additions and 222 deletions

View File

@ -0,0 +1,324 @@
/*-------------------------------------------------------------------------
*
* intermediate_result_encoder.c
* Functions for encoding and decoding intermediate results.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include <sys/stat.h>
#include <unistd.h>
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "catalog/pg_enum.h"
#include "commands/copy.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/connection_management.h"
#include "distributed/intermediate_results.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/remote_commands.h"
#include "distributed/transmit.h"
#include "distributed/transaction_identifier.h"
#include "distributed/tuplestore.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "mb/pg_wchar.h"
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/primnodes.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#define ENCODER_BUFFER_SIZE (4 * 1024 * 1024)
/* internal state of intermediate result file encoder */
struct IntermediateResultEncoder
{
/*
* Result of encoding is accumulated in outputBuffer, until it is flushed.
* The flush is done when the data is returned as result of
* IntermediateResultEncoderReceive() and IntermediateResultEncoderDone()
* functions.
*/
StringInfo outputBuffer;
/*
* Used for returning the flushed result of encoding, at which time move the
* data pointer from outputBuffer to flushBuffer before reseting length of
* outputBuffer.
*
* This is kept here to avoid allocating it everytime we need to flush some data.
*/
StringInfo flushBuffer;
IntermediateResultFormat format;
TupleDesc tupleDescriptor;
/* used when format is *_COPY_FORMAT */
CopyOutState copyOutState;
FmgrInfo *columnOutputFunctions;
};
/* forward declaration of local functions */
static void ReadCopyFileIntoTupleStore(char *fileName, char *copyFormat,
TupleDesc tupleDescriptor,
Tuplestorestate *tupstore);
static Relation StubRelation(TupleDesc tupleDescriptor);
/*
* IntermediateResultEncoderCreate returns an encoder which can encode tuples of
* given tupleDesc form using the given format. Encoder possibly can add header
* data at this stage.
*/
IntermediateResultEncoder *
IntermediateResultEncoderCreate(TupleDesc tupleDesc,
IntermediateResultFormat format,
MemoryContext tupleContext)
{
IntermediateResultEncoder *encoder = palloc0(sizeof(IntermediateResultEncoder));
encoder->format = format;
encoder->tupleDescriptor = CreateTupleDescCopy(tupleDesc);
encoder->outputBuffer = makeStringInfo();
encoder->flushBuffer = makeStringInfo();
if (format == TEXT_COPY_FORMAT || format == BINARY_COPY_FORMAT)
{
int fileEncoding = pg_get_client_encoding();
int databaseEncoding = GetDatabaseEncoding();
int databaseEncodingMaxLength = pg_database_encoding_max_length();
char *nullPrint = "\\N";
int nullPrintLen = strlen(nullPrint);
char *nullPrintClient = pg_server_to_any(nullPrint, nullPrintLen, fileEncoding);
CopyOutState copyOutState = palloc0(sizeof(CopyOutStateData));
copyOutState->delim = "\t";
copyOutState->null_print = nullPrint;
copyOutState->null_print_client = nullPrintClient;
copyOutState->binary = (format == BINARY_COPY_FORMAT);
copyOutState->fe_msgbuf = encoder->outputBuffer;
copyOutState->rowcontext = tupleContext;
if (PG_ENCODING_IS_CLIENT_ONLY(fileEncoding))
{
ereport(ERROR, (errmsg("cannot repartition into encoding caller "
"cannot receive")));
}
/* set up transcoding information and default text output characters */
copyOutState->need_transcoding = (fileEncoding != databaseEncoding) ||
(databaseEncodingMaxLength > 1);
encoder->copyOutState = copyOutState;
encoder->columnOutputFunctions =
ColumnOutputFunctions(tupleDesc, copyOutState->binary);
if (copyOutState->binary)
{
AppendCopyBinaryHeaders(copyOutState);
}
}
return encoder;
}
/*
* IntermediateResultEncoderReceive encodes the next row with the given encoder.
*/
StringInfo
IntermediateResultEncoderReceive(IntermediateResultEncoder *encoder,
Datum *values, bool *nulls)
{
if (encoder->format == TEXT_COPY_FORMAT ||
encoder->format == BINARY_COPY_FORMAT)
{
AppendCopyRowData(values, nulls, encoder->tupleDescriptor,
encoder->copyOutState, encoder->columnOutputFunctions, NULL);
}
if (encoder->outputBuffer->len > ENCODER_BUFFER_SIZE)
{
encoder->flushBuffer->data = encoder->outputBuffer->data;
encoder->flushBuffer->len = encoder->outputBuffer->len;
encoder->outputBuffer->len = 0;
return encoder->flushBuffer;
}
return NULL;
}
/*
* IntermediateResultEncoderDone tells the encoder that there is no more work
* to do. Encoder possibly can add footer data at this stage.
*/
StringInfo
IntermediateResultEncoderDone(IntermediateResultEncoder *encoder)
{
if (encoder->format == TEXT_COPY_FORMAT ||
encoder->format == BINARY_COPY_FORMAT)
{
CopyOutState copyOutState = encoder->copyOutState;
if (copyOutState->binary)
{
AppendCopyBinaryFooters(copyOutState);
}
}
if (encoder->outputBuffer->len > 0)
{
encoder->flushBuffer->data = encoder->outputBuffer->data;
encoder->flushBuffer->len = encoder->outputBuffer->len;
encoder->outputBuffer->len = 0;
return encoder->flushBuffer;
}
return NULL;
}
/*
* IntermediateResultEncoderDestroy cleans up resources used by the encoder.
*/
void
IntermediateResultEncoderDestroy(IntermediateResultEncoder *encoder)
{
if (encoder->copyOutState != NULL)
{
pfree(encoder->copyOutState);
}
if (encoder->columnOutputFunctions != NULL)
{
pfree(encoder->columnOutputFunctions);
}
}
/*
* ReadFileIntoTupleStore parses the records in the given file using the
* given format and puts the rows in the given tuple store.
*/
void
ReadFileIntoTupleStore(char *fileName, IntermediateResultFormat format,
TupleDesc tupleDescriptor, Tuplestorestate *tupstore)
{
if (format == TEXT_COPY_FORMAT || format == BINARY_COPY_FORMAT)
{
char *copyFormat = (format == TEXT_COPY_FORMAT) ? "text" : "binary";
ReadCopyFileIntoTupleStore(fileName, copyFormat, tupleDescriptor, tupstore);
}
}
/*
* ReadCopyFileIntoTupleStore parses the records in a COPY-formatted file
* according to the given tuple descriptor and stores the records in a tuple
* store.
*/
static void
ReadCopyFileIntoTupleStore(char *fileName, char *copyFormat,
TupleDesc tupleDescriptor,
Tuplestorestate *tupstore)
{
/*
* Trick BeginCopyFrom into using our tuple descriptor by pretending it belongs
* to a relation.
*/
Relation stubRelation = StubRelation(tupleDescriptor);
EState *executorState = CreateExecutorState();
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);
int columnCount = tupleDescriptor->natts;
Datum *columnValues = palloc0(columnCount * sizeof(Datum));
bool *columnNulls = palloc0(columnCount * sizeof(bool));
List *copyOptions = NIL;
int location = -1; /* "unknown" token location */
DefElem *copyOption = makeDefElem("format", (Node *) makeString(copyFormat),
location);
copyOptions = lappend(copyOptions, copyOption);
CopyState copyState = BeginCopyFrom(NULL, stubRelation, fileName, false, NULL,
NULL, copyOptions);
while (true)
{
ResetPerTupleExprContext(executorState);
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
bool nextRowFound = NextCopyFromCompat(copyState, executorExpressionContext,
columnValues, columnNulls);
if (!nextRowFound)
{
MemoryContextSwitchTo(oldContext);
break;
}
tuplestore_putvalues(tupstore, tupleDescriptor, columnValues, columnNulls);
MemoryContextSwitchTo(oldContext);
}
EndCopyFrom(copyState);
pfree(columnValues);
pfree(columnNulls);
}
/*
* StubRelation creates a stub Relation from the given tuple descriptor.
* To be able to use copy.c, we need a Relation descriptor. As there is no
* relation corresponding to the data loaded from workers, we need to fake one.
* We just need the bare minimal set of fields accessed by BeginCopyFrom().
*/
static Relation
StubRelation(TupleDesc tupleDescriptor)
{
Relation stubRelation = palloc0(sizeof(RelationData));
stubRelation->rd_att = tupleDescriptor;
stubRelation->rd_rel = palloc0(sizeof(FormData_pg_class));
stubRelation->rd_rel->relkind = RELKIND_RELATION;
return stubRelation;
}
/*
* ResultFileFormatForTupleDesc returns the most suitable encoding format for
* the given tuple descriptor.
*/
IntermediateResultFormat
ResultFileFormatForTupleDesc(TupleDesc tupleDesc)
{
if (CanUseBinaryCopyFormat(tupleDesc))
{
return BINARY_COPY_FORMAT;
}
else
{
return TEXT_COPY_FORMAT;
}
}

View File

@ -54,9 +54,6 @@ typedef struct RemoteFileDestReceiver
const char *resultId; const char *resultId;
/* descriptor of the tuples that are sent to the worker */
TupleDesc tupleDescriptor;
/* EState for per-tuple memory allocation */ /* EState for per-tuple memory allocation */
EState *executorState; EState *executorState;
@ -72,8 +69,7 @@ typedef struct RemoteFileDestReceiver
FileCompat fileCompat; FileCompat fileCompat;
/* state on how to copy out data types */ /* state on how to copy out data types */
CopyOutState copyOutState; IntermediateResultEncoder *encoder;
FmgrInfo *columnOutputFunctions;
/* number of tuples sent */ /* number of tuples sent */
uint64 tuplesSent; uint64 tuplesSent;
@ -93,13 +89,15 @@ static void RemoteFileDestReceiverDestroy(DestReceiver *destReceiver);
static char * IntermediateResultsDirectory(void); static char * IntermediateResultsDirectory(void);
static void ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, static void ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo,
char *copyFormat, IntermediateResultFormat format,
Datum *resultIdArray, Datum *resultIdArray,
int resultCount); int resultCount);
static uint64 FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId); static uint64 FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId);
static CopyStatus CopyDataFromConnection(MultiConnection *connection, static CopyStatus CopyDataFromConnection(MultiConnection *connection,
FileCompat *fileCompat, FileCompat *fileCompat,
uint64 *bytesReceived); uint64 *bytesReceived);
static IntermediateResultFormat ParseIntermediateResultFormatLabel(char *formatString);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(read_intermediate_result); PG_FUNCTION_INFO_V1(read_intermediate_result);
@ -235,26 +233,13 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
const char *resultId = resultDest->resultId; const char *resultId = resultDest->resultId;
const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N";
List *initialNodeList = resultDest->initialNodeList; List *initialNodeList = resultDest->initialNodeList;
List *connectionList = NIL; List *connectionList = NIL;
MemoryContext tupleContext = GetPerTupleMemoryContext(resultDest->executorState);
IntermediateResultFormat format = ResultFileFormatForTupleDesc(inputTupleDescriptor);
resultDest->tupleDescriptor = inputTupleDescriptor; resultDest->encoder = IntermediateResultEncoderCreate(inputTupleDescriptor,
format, tupleContext);
/* define how tuples will be serialised */
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->delim = (char *) delimiterCharacter;
copyOutState->null_print = (char *) nullPrintCharacter;
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor);
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = GetPerTupleMemoryContext(resultDest->executorState);
resultDest->copyOutState = copyOutState;
resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary);
if (resultDest->writeLocalFile) if (resultDest->writeLocalFile)
{ {
@ -322,19 +307,6 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
PQclear(result); PQclear(result);
} }
if (copyOutState->binary)
{
/* send headers when using binary encoding */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryHeaders(copyOutState);
BroadcastCopyData(copyOutState->fe_msgbuf, connectionList);
if (resultDest->writeLocalFile)
{
WriteToLocalFile(copyOutState->fe_msgbuf, &resultDest->fileCompat);
}
}
resultDest->connectionList = connectionList; resultDest->connectionList = connectionList;
} }
@ -365,13 +337,7 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{ {
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest; RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest;
TupleDesc tupleDescriptor = resultDest->tupleDescriptor;
List *connectionList = resultDest->connectionList; List *connectionList = resultDest->connectionList;
CopyOutState copyOutState = resultDest->copyOutState;
FmgrInfo *columnOutputFunctions = resultDest->columnOutputFunctions;
StringInfo copyData = copyOutState->fe_msgbuf;
EState *executorState = resultDest->executorState; EState *executorState = resultDest->executorState;
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
@ -382,19 +348,18 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
Datum *columnValues = slot->tts_values; Datum *columnValues = slot->tts_values;
bool *columnNulls = slot->tts_isnull; bool *columnNulls = slot->tts_isnull;
resetStringInfo(copyData); StringInfo bufferToFlush =
IntermediateResultEncoderReceive(resultDest->encoder, columnValues, columnNulls);
/* construct row in COPY format */ if (bufferToFlush != NULL)
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, {
copyOutState, columnOutputFunctions, NULL);
/* send row to nodes */ /* send row to nodes */
BroadcastCopyData(copyData, connectionList); BroadcastCopyData(bufferToFlush, connectionList);
/* write to local file (if applicable) */ /* write to local file (if applicable) */
if (resultDest->writeLocalFile) if (resultDest->writeLocalFile)
{ {
WriteToLocalFile(copyOutState->fe_msgbuf, &resultDest->fileCompat); WriteToLocalFile(bufferToFlush, &resultDest->fileCompat);
}
} }
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
@ -433,20 +398,18 @@ static void
RemoteFileDestReceiverShutdown(DestReceiver *destReceiver) RemoteFileDestReceiverShutdown(DestReceiver *destReceiver)
{ {
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver; RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver;
List *connectionList = resultDest->connectionList; List *connectionList = resultDest->connectionList;
CopyOutState copyOutState = resultDest->copyOutState;
if (copyOutState->binary) StringInfo bufferToFlush = IntermediateResultEncoderDone(resultDest->encoder);
if (bufferToFlush != NULL)
{ {
/* send footers when using binary encoding */ /* send row to nodes */
resetStringInfo(copyOutState->fe_msgbuf); BroadcastCopyData(bufferToFlush, connectionList);
AppendCopyBinaryFooters(copyOutState);
BroadcastCopyData(copyOutState->fe_msgbuf, connectionList);
/* write to local file (if applicable) */
if (resultDest->writeLocalFile) if (resultDest->writeLocalFile)
{ {
WriteToLocalFile(copyOutState->fe_msgbuf, &resultDest->fileCompat); WriteToLocalFile(bufferToFlush, &resultDest->fileCompat);
} }
} }
@ -497,15 +460,7 @@ RemoteFileDestReceiverDestroy(DestReceiver *destReceiver)
{ {
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver; RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver;
if (resultDest->copyOutState) IntermediateResultEncoderDestroy(resultDest->encoder);
{
pfree(resultDest->copyOutState);
}
if (resultDest->columnOutputFunctions)
{
pfree(resultDest->columnOutputFunctions);
}
pfree(resultDest); pfree(resultDest);
} }
@ -708,13 +663,14 @@ Datum
read_intermediate_result(PG_FUNCTION_ARGS) read_intermediate_result(PG_FUNCTION_ARGS)
{ {
Datum resultId = PG_GETARG_DATUM(0); Datum resultId = PG_GETARG_DATUM(0);
Datum copyFormatOidDatum = PG_GETARG_DATUM(1); Datum formatOidDatum = PG_GETARG_DATUM(1);
Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); Datum formatLabelDatum = DirectFunctionCall1(enum_out, formatOidDatum);
char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); char *formatLabel = DatumGetCString(formatLabelDatum);
IntermediateResultFormat format = ParseIntermediateResultFormatLabel(formatLabel);
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel, &resultId, 1); ReadIntermediateResultsIntoFuncOutput(fcinfo, format, &resultId, 1);
PG_RETURN_DATUM(0); PG_RETURN_DATUM(0);
} }
@ -733,8 +689,9 @@ read_intermediate_result_array(PG_FUNCTION_ARGS)
ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0); ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0);
Datum copyFormatOidDatum = PG_GETARG_DATUM(1); Datum copyFormatOidDatum = PG_GETARG_DATUM(1);
Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); Datum formatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum);
char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); char *formatLabel = DatumGetCString(formatLabelDatum);
IntermediateResultFormat format = ParseIntermediateResultFormatLabel(formatLabel);
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
@ -742,7 +699,7 @@ read_intermediate_result_array(PG_FUNCTION_ARGS)
resultIdObject)); resultIdObject));
Datum *resultIdArray = DeconstructArrayObject(resultIdObject); Datum *resultIdArray = DeconstructArrayObject(resultIdObject);
ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel, ReadIntermediateResultsIntoFuncOutput(fcinfo, format,
resultIdArray, resultCount); resultIdArray, resultCount);
PG_RETURN_DATUM(0); PG_RETURN_DATUM(0);
@ -755,7 +712,8 @@ read_intermediate_result_array(PG_FUNCTION_ARGS)
* don't exist. * don't exist.
*/ */
static void static void
ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo,
IntermediateResultFormat format,
Datum *resultIdArray, int resultCount) Datum *resultIdArray, int resultCount)
{ {
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
@ -774,7 +732,7 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat,
errmsg("result \"%s\" does not exist", resultId))); errmsg("result \"%s\" does not exist", resultId)));
} }
ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, tupleStore); ReadFileIntoTupleStore(resultFileName, format, tupleDescriptor, tupleStore);
} }
tuplestore_donestoring(tupleStore); tuplestore_donestoring(tupleStore);
@ -1003,3 +961,21 @@ CopyDataFromConnection(MultiConnection *connection, FileCompat *fileCompat,
return CLIENT_COPY_FAILED; return CLIENT_COPY_FAILED;
} }
} }
static IntermediateResultFormat
ParseIntermediateResultFormatLabel(char *formatString)
{
if (strncmp(formatString, "text", NAMEDATALEN) == 0 ||
strncmp(formatString, "csv", NAMEDATALEN) == 0)
{
return TEXT_COPY_FORMAT;
}
else if (strncmp(formatString, "binary", NAMEDATALEN) == 0)
{
return BINARY_COPY_FORMAT;
}
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Invalid intermediate result format: %s", formatString)));
}

View File

@ -11,6 +11,7 @@
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
@ -22,6 +23,7 @@
#include "distributed/insert_select_executor.h" #include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/intermediate_results.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_master_planner.h" #include "distributed/multi_master_planner.h"
@ -31,6 +33,7 @@
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/transmit.h"
#include "distributed/worker_shard_visibility.h" #include "distributed/worker_shard_visibility.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "executor/execdebug.h" #include "executor/execdebug.h"
@ -68,7 +71,6 @@ int ExecutorLevel = 0;
/* local function forward declarations */ /* local function forward declarations */
static Relation StubRelation(TupleDesc tupleDescriptor);
static bool AlterTableConstraintCheck(QueryDesc *queryDesc); static bool AlterTableConstraintCheck(QueryDesc *queryDesc);
static bool IsLocalReferenceTableJoinPlan(PlannedStmt *plan); static bool IsLocalReferenceTableJoinPlan(PlannedStmt *plan);
static List * FindCitusCustomScanStates(PlanState *planState); static List * FindCitusCustomScanStates(PlanState *planState);
@ -388,7 +390,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
List *workerTaskList = workerJob->taskList; List *workerTaskList = workerJob->taskList;
bool randomAccess = true; bool randomAccess = true;
bool interTransactions = false; bool interTransactions = false;
char *copyFormat = "text"; IntermediateResultFormat format = TEXT_COPY_FORMAT;
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(citusScanState); TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(citusScanState);
@ -398,7 +400,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
if (BinaryMasterCopyFormat) if (BinaryMasterCopyFormat)
{ {
copyFormat = "binary"; format = BINARY_COPY_FORMAT;
} }
Task *workerTask = NULL; Task *workerTask = NULL;
@ -407,7 +409,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
StringInfo jobDirectoryName = MasterJobDirectoryName(workerTask->jobId); StringInfo jobDirectoryName = MasterJobDirectoryName(workerTask->jobId);
StringInfo taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId); StringInfo taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId);
ReadFileIntoTupleStore(taskFilename->data, copyFormat, tupleDescriptor, ReadFileIntoTupleStore(taskFilename->data, format, tupleDescriptor,
citusScanState->tuplestorestate); citusScanState->tuplestorestate);
} }
@ -415,62 +417,6 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
} }
/*
* ReadFileIntoTupleStore parses the records in a COPY-formatted file according
* according to the given tuple descriptor and stores the records in a tuple
* store.
*/
void
ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescriptor,
Tuplestorestate *tupstore)
{
/*
* Trick BeginCopyFrom into using our tuple descriptor by pretending it belongs
* to a relation.
*/
Relation stubRelation = StubRelation(tupleDescriptor);
EState *executorState = CreateExecutorState();
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);
int columnCount = tupleDescriptor->natts;
Datum *columnValues = palloc0(columnCount * sizeof(Datum));
bool *columnNulls = palloc0(columnCount * sizeof(bool));
List *copyOptions = NIL;
int location = -1; /* "unknown" token location */
DefElem *copyOption = makeDefElem("format", (Node *) makeString(copyFormat),
location);
copyOptions = lappend(copyOptions, copyOption);
CopyState copyState = BeginCopyFrom(NULL, stubRelation, fileName, false, NULL,
NULL, copyOptions);
while (true)
{
ResetPerTupleExprContext(executorState);
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
bool nextRowFound = NextCopyFromCompat(copyState, executorExpressionContext,
columnValues, columnNulls);
if (!nextRowFound)
{
MemoryContextSwitchTo(oldContext);
break;
}
tuplestore_putvalues(tupstore, tupleDescriptor, columnValues, columnNulls);
MemoryContextSwitchTo(oldContext);
}
EndCopyFrom(copyState);
pfree(columnValues);
pfree(columnNulls);
}
/* /*
* SortTupleStore gets a CitusScanState and sorts the tuplestore by all the * SortTupleStore gets a CitusScanState and sorts the tuplestore by all the
* entries in the target entry list, starting from the first one and * entries in the target entry list, starting from the first one and
@ -568,24 +514,6 @@ SortTupleStore(CitusScanState *scanState)
} }
/*
* StubRelation creates a stub Relation from the given tuple descriptor.
* To be able to use copy.c, we need a Relation descriptor. As there is no
* relation corresponding to the data loaded from workers, we need to fake one.
* We just need the bare minimal set of fields accessed by BeginCopyFrom().
*/
static Relation
StubRelation(TupleDesc tupleDescriptor)
{
Relation stubRelation = palloc0(sizeof(RelationData));
stubRelation->rd_att = tupleDescriptor;
stubRelation->rd_rel = palloc0(sizeof(FormData_pg_class));
stubRelation->rd_rel->relkind = RELKIND_RELATION;
return stubRelation;
}
/* /*
* ExecuteQueryStringIntoDestReceiver plans and executes a query and sends results * ExecuteQueryStringIntoDestReceiver plans and executes a query and sends results
* to the given DestReceiver. * to the given DestReceiver.

View File

@ -460,8 +460,11 @@ PartitionedResultDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDes
partitionIndex); partitionIndex);
char *filePath = QueryResultFileName(resultId->data); char *filePath = QueryResultFileName(resultId->data);
IntermediateResultFormat format =
partitionedDest->binaryCopy ? BINARY_COPY_FORMAT : TEXT_COPY_FORMAT;
partitionDest = CreateFileDestReceiver(filePath, partitionedDest->perTupleContext, partitionDest = CreateFileDestReceiver(filePath, partitionedDest->perTupleContext,
partitionedDest->binaryCopy); format);
partitionedDest->partitionDestReceivers[partitionIndex] = partitionDest; partitionedDest->partitionDestReceivers[partitionIndex] = partitionDest;
partitionDest->rStartup(partitionDest, 0, partitionedDest->tupleDescriptor); partitionDest->rStartup(partitionDest, 0, partitionedDest->tupleDescriptor);
} }

View File

@ -44,11 +44,10 @@ typedef struct TaskFileDestReceiver
/* output file */ /* output file */
char *filePath; char *filePath;
FileCompat fileCompat; FileCompat fileCompat;
bool binaryCopyFormat; IntermediateResultFormat format;
/* state on how to copy out data types */ /* state on how to copy out data types */
CopyOutState copyOutState; IntermediateResultEncoder *encoder;
FmgrInfo *columnOutputFunctions;
/* statistics */ /* statistics */
uint64 tuplesSent; uint64 tuplesSent;
@ -102,11 +101,14 @@ WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat)
{ {
ParamListInfo paramListInfo = NULL; ParamListInfo paramListInfo = NULL;
IntermediateResultFormat format =
binaryCopyFormat ? BINARY_COPY_FORMAT : TEXT_COPY_FORMAT;
EState *estate = CreateExecutorState(); EState *estate = CreateExecutorState();
MemoryContext tupleContext = GetPerTupleMemoryContext(estate); MemoryContext tupleContext = GetPerTupleMemoryContext(estate);
TaskFileDestReceiver *taskFileDest = TaskFileDestReceiver *taskFileDest =
(TaskFileDestReceiver *) CreateFileDestReceiver(taskFilename, tupleContext, (TaskFileDestReceiver *) CreateFileDestReceiver(taskFilename, tupleContext,
binaryCopyFormat); format);
ExecuteQueryIntoDestReceiver(query, paramListInfo, (DestReceiver *) taskFileDest); ExecuteQueryIntoDestReceiver(query, paramListInfo, (DestReceiver *) taskFileDest);
@ -124,7 +126,8 @@ WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat)
* to a file. * to a file.
*/ */
DestReceiver * DestReceiver *
CreateFileDestReceiver(char *filePath, MemoryContext tupleContext, bool binaryCopyFormat) CreateFileDestReceiver(char *filePath, MemoryContext tupleContext,
IntermediateResultFormat format)
{ {
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) palloc0( TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) palloc0(
sizeof(TaskFileDestReceiver)); sizeof(TaskFileDestReceiver));
@ -140,7 +143,7 @@ CreateFileDestReceiver(char *filePath, MemoryContext tupleContext, bool binaryCo
taskFileDest->tupleContext = tupleContext; taskFileDest->tupleContext = tupleContext;
taskFileDest->memoryContext = CurrentMemoryContext; taskFileDest->memoryContext = CurrentMemoryContext;
taskFileDest->filePath = pstrdup(filePath); taskFileDest->filePath = pstrdup(filePath);
taskFileDest->binaryCopyFormat = binaryCopyFormat; taskFileDest->format = format;
return (DestReceiver *) taskFileDest; return (DestReceiver *) taskFileDest;
} }
@ -157,9 +160,6 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
{ {
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest; TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest;
const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N";
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
const int fileMode = (S_IRUSR | S_IWUSR); const int fileMode = (S_IRUSR | S_IWUSR);
@ -169,29 +169,15 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
taskFileDest->tupleDescriptor = inputTupleDescriptor; taskFileDest->tupleDescriptor = inputTupleDescriptor;
/* define how tuples will be serialised */ /* define how tuples will be serialised */
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); taskFileDest->encoder = IntermediateResultEncoderCreate(inputTupleDescriptor,
copyOutState->delim = (char *) delimiterCharacter; taskFileDest->format,
copyOutState->null_print = (char *) nullPrintCharacter; taskFileDest->tupleContext);
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = taskFileDest->binaryCopyFormat;
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = taskFileDest->tupleContext;
taskFileDest->copyOutState = copyOutState;
taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary);
taskFileDest->fileCompat = FileCompatFromFileStart(FileOpenForTransmit( taskFileDest->fileCompat = FileCompatFromFileStart(FileOpenForTransmit(
taskFileDest->filePath, taskFileDest->filePath,
fileFlags, fileFlags,
fileMode)); fileMode));
if (copyOutState->binary)
{
/* write headers when using binary encoding */
AppendCopyBinaryHeaders(copyOutState);
}
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
} }
@ -206,12 +192,7 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{ {
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest; TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest;
TupleDesc tupleDescriptor = taskFileDest->tupleDescriptor; IntermediateResultEncoder *encoder = taskFileDest->encoder;
CopyOutState copyOutState = taskFileDest->copyOutState;
FmgrInfo *columnOutputFunctions = taskFileDest->columnOutputFunctions;
StringInfo copyData = copyOutState->fe_msgbuf;
MemoryContext executorTupleContext = taskFileDest->tupleContext; MemoryContext executorTupleContext = taskFileDest->tupleContext;
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
@ -221,14 +202,11 @@ TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
Datum *columnValues = slot->tts_values; Datum *columnValues = slot->tts_values;
bool *columnNulls = slot->tts_isnull; bool *columnNulls = slot->tts_isnull;
/* construct row in COPY format */ StringInfo bufferToFlush = IntermediateResultEncoderReceive(encoder, columnValues,
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, columnNulls);
copyOutState, columnOutputFunctions, NULL); if (bufferToFlush != NULL)
if (copyData->len > COPY_BUFFER_SIZE)
{ {
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); WriteToLocalFile(bufferToFlush, taskFileDest);
resetStringInfo(copyData);
} }
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
@ -268,20 +246,12 @@ static void
TaskFileDestReceiverShutdown(DestReceiver *destReceiver) TaskFileDestReceiverShutdown(DestReceiver *destReceiver)
{ {
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver; TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver;
CopyOutState copyOutState = taskFileDest->copyOutState;
if (copyOutState->fe_msgbuf->len > 0) IntermediateResultEncoder *encoder = taskFileDest->encoder;
StringInfo bufferToFlush = IntermediateResultEncoderDone(encoder);
if (bufferToFlush != NULL)
{ {
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest); WriteToLocalFile(bufferToFlush, taskFileDest);
resetStringInfo(copyOutState->fe_msgbuf);
}
if (copyOutState->binary)
{
/* write footers when using binary encoding */
AppendCopyBinaryFooters(copyOutState);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest);
resetStringInfo(copyOutState->fe_msgbuf);
} }
FileClose(taskFileDest->fileCompat.fd); FileClose(taskFileDest->fileCompat.fd);
@ -297,15 +267,7 @@ TaskFileDestReceiverDestroy(DestReceiver *destReceiver)
{ {
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver; TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver;
if (taskFileDest->copyOutState) IntermediateResultEncoderDestroy(taskFileDest->encoder);
{
pfree(taskFileDest->copyOutState);
}
if (taskFileDest->columnOutputFunctions)
{
pfree(taskFileDest->columnOutputFunctions);
}
pfree(taskFileDest->filePath); pfree(taskFileDest->filePath);
pfree(taskFileDest); pfree(taskFileDest);

View File

@ -48,6 +48,24 @@ typedef struct DistributedResultFragment
} DistributedResultFragment; } DistributedResultFragment;
/* constants for intermediate result encoding formats */
typedef enum IntermediateResultFormat
{
TEXT_COPY_FORMAT,
BINARY_COPY_FORMAT
} IntermediateResultFormat;
/*
* IntermediateResultEncoder represents encoder state for intermediate result
* files. This structure is created by IntermediateResultEncoderCreate(), and
* then user should use IntermediateResultEncoderReceive() for encoding each
* row. Finally, users should call IntermediateResultEncoderDone() to finish
* the encoding.
*/
typedef struct IntermediateResultEncoder IntermediateResultEncoder;
/* intermediate_results.c */ /* intermediate_results.c */
extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId, extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId,
EState *executorState, EState *executorState,
@ -60,6 +78,19 @@ extern int64 IntermediateResultSize(const char *resultId);
extern char * QueryResultFileName(const char *resultId); extern char * QueryResultFileName(const char *resultId);
extern char * CreateIntermediateResultsDirectory(void); extern char * CreateIntermediateResultsDirectory(void);
/* encoding intermediate result files */
extern IntermediateResultEncoder * IntermediateResultEncoderCreate(TupleDesc tupleDesc,
IntermediateResultFormat
format, MemoryContext
tupleContext);
extern StringInfo IntermediateResultEncoderReceive(IntermediateResultEncoder *encoder,
Datum *values, bool *nulls);
extern StringInfo IntermediateResultEncoderDone(IntermediateResultEncoder *encoder);
extern void IntermediateResultEncoderDestroy(IntermediateResultEncoder *encoder);
extern void ReadFileIntoTupleStore(char *fileName, IntermediateResultFormat format,
TupleDesc tupleDescriptor, Tuplestorestate *tupstore);
extern IntermediateResultFormat ResultFileFormatForTupleDesc(TupleDesc tupleDesc);
/* distributed_intermediate_results.c */ /* distributed_intermediate_results.c */
extern List ** RedistributeTaskListResults(const char *resultIdPrefix, extern List ** RedistributeTaskListResults(const char *resultIdPrefix,
List *selectTaskList, List *selectTaskList,

View File

@ -92,8 +92,6 @@ extern bool IsCitusCustomState(PlanState *planState);
extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * CitusExecScan(CustomScanState *node);
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
tupleDescriptor, Tuplestorestate *tupstore);
extern Query * ParseQueryString(const char *queryString, Oid *paramOids, int numParams); extern Query * ParseQueryString(const char *queryString, Oid *paramOids, int numParams);
extern Query * RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString, extern Query * RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString,
Oid *paramOids, int numParams); Oid *paramOids, int numParams);

View File

@ -17,6 +17,7 @@
#include "postgres.h" #include "postgres.h"
#include "fmgr.h" #include "fmgr.h"
#include "distributed/intermediate_results.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
@ -138,7 +139,7 @@ extern CreateStmt * CreateStatement(RangeVar *relation, List *columnDefinitionLi
extern CopyStmt * CopyStatement(RangeVar *relation, char *sourceFilename); extern CopyStmt * CopyStatement(RangeVar *relation, char *sourceFilename);
extern DestReceiver * CreateFileDestReceiver(char *filePath, extern DestReceiver * CreateFileDestReceiver(char *filePath,
MemoryContext tupleContext, MemoryContext tupleContext,
bool binaryCopyFormat); IntermediateResultFormat format);
extern void FileDestReceiverStats(DestReceiver *dest, extern void FileDestReceiverStats(DestReceiver *dest,
uint64 *rowsSent, uint64 *rowsSent,
uint64 *bytesSent); uint64 *bytesSent);