From b73b0242bfba464cf63e2dadd7d3e9e731bca646 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 5 Dec 2019 10:48:53 -0800 Subject: [PATCH] Create write_intermediate_result() --- .../executor/intermediate_results.c | 120 ++++++++++++++++++ .../distributed/sql/citus--9.1-1--9.2-1.sql | 16 +++ 2 files changed, 136 insertions(+) diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index fed7ad6cf..a6344d345 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -39,6 +39,7 @@ #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" +#include "utils/typcache.h" static bool CreatedResultsDirectory = false; @@ -98,6 +99,125 @@ static char * QueryResultFileName(const char *resultId); PG_FUNCTION_INFO_V1(read_intermediate_result); PG_FUNCTION_INFO_V1(broadcast_intermediate_result); PG_FUNCTION_INFO_V1(create_intermediate_result); +PG_FUNCTION_INFO_V1(write_intermediate_result_sfunc); +PG_FUNCTION_INFO_V1(write_intermediate_result_final); + + +typedef struct +{ + TupleDesc tupleDescriptor; + FmgrInfo *columnOutputFunctions; + MemoryContext rowContext; + CopyOutState copyOutState; + FileCompat fileCompat; + Datum *values; + bool *nulls; +} WriteIntermediateResultsState; + +Datum +write_intermediate_result_sfunc(PG_FUNCTION_ARGS) +{ + WriteIntermediateResultsState *state = + (WriteIntermediateResultsState *) PG_GETARG_POINTER(0); + HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(2); + + if (state == NULL) + { + MemoryContext agg_context; + MemoryContext old_context; + + text *resultId = PG_GETARG_TEXT_P(1); + char *resultIdString = text_to_cstring(resultId); + + if (!AggCheckCallContext(fcinfo, &agg_context)) + { + elog(ERROR, "aggregate function called in non-aggregate context"); + } + + old_context = MemoryContextSwitchTo(agg_context); + state = palloc0(sizeof(WriteIntermediateResultsState)); + + Oid tupType = HeapTupleHeaderGetTypeId(rec); + int32 tupTypmod = HeapTupleHeaderGetTypMod(rec); + state->tupleDescriptor = lookup_rowtype_tupdesc(tupType, tupTypmod); + + int natts = state->tupleDescriptor->natts; + state->values = palloc(natts * sizeof(Datum)); + state->nulls = (bool *) palloc(natts * sizeof(bool)); + + /* define how tuples will be serialised */ + const char *delimiterCharacter = "\t"; + const char *nullPrintCharacter = "\\N"; + CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); + copyOutState->delim = (char *) delimiterCharacter; + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->binary = false;/*CanUseBinaryCopyFormat(state->tupleDescriptor); */ + copyOutState->fe_msgbuf = makeStringInfo(); + copyOutState->rowcontext = + AllocSetContextCreate(agg_context, "COPY TO", ALLOCSET_DEFAULT_SIZES); + + state->copyOutState = copyOutState; + + /* make sure the directory exists */ + CreateIntermediateResultsDirectory(); + + const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); + const int fileMode = (S_IRUSR | S_IWUSR); + char *filePath = QueryResultFileName(resultIdString); + state->fileCompat = + FileCompatFromFileStart(FileOpenForTransmit(filePath, fileFlags, fileMode)); + + state->columnOutputFunctions = ColumnOutputFunctions(state->tupleDescriptor, + copyOutState->binary); + + MemoryContextSwitchTo(old_context); + } + + HeapTupleData tuple; + tuple.t_len = HeapTupleHeaderGetDatumLength(rec); + ItemPointerSetInvalid(&(tuple.t_self)); + tuple.t_tableOid = InvalidOid; + tuple.t_data = rec; + + TupleDesc tupleDesc = state->tupleDescriptor; + Datum *values = state->values; + bool *nulls = state->nulls; + heap_deform_tuple(&tuple, tupleDesc, values, nulls); + + AppendCopyRowData(values, nulls, tupleDesc, + state->copyOutState, + state->columnOutputFunctions, NULL); + + StringInfo copyData = state->copyOutState->fe_msgbuf; + if (copyData->len >= 1024 * 1024) + { + FileWriteCompat(&state->fileCompat, copyData->data, + copyData->len, PG_WAIT_IO); + resetStringInfo(copyData); + } + + PG_RETURN_POINTER(state); +} + + +Datum +write_intermediate_result_final(PG_FUNCTION_ARGS) +{ + WriteIntermediateResultsState *state = + (WriteIntermediateResultsState *) PG_GETARG_POINTER(0); + ReleaseTupleDesc(state->tupleDescriptor); + + StringInfo copyData = state->copyOutState->fe_msgbuf; + if (copyData->len > 0) + { + FileWriteCompat(&state->fileCompat, copyData->data, + copyData->len, PG_WAIT_IO); + } + + FileClose(state->fileCompat.fd); + PG_RETURN_VOID(); +} /* diff --git a/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql b/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql index e69de29bb..f9768a64d 100644 --- a/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql +++ b/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql @@ -0,0 +1,16 @@ +CREATE FUNCTION write_intermediate_result_sfunc(state INTERNAL, filename TEXT, tuple RECORD) +RETURNS INTERNAL +LANGUAGE C +CALLED ON NULL INPUT +AS 'MODULE_PATHNAME', $$write_intermediate_result_sfunc$$; + +CREATE FUNCTION write_intermediate_result_final(state INTERNAL) +RETURNS VOID +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$write_intermediate_result_final$$; + +CREATE AGGREGATE write_intermediate_result(filename TEXT, tuple RECORD) ( + STYPE=INTERNAL, + SFUNC=write_intermediate_result_sfunc, + FINALFUNC=write_intermediate_result_final +);