mirror of https://github.com/citusdata/citus.git
Create write_intermediate_result()
parent
9463509e4a
commit
b73b0242bf
|
@ -39,6 +39,7 @@
|
|||
#include "utils/lsyscache.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "utils/typcache.h"
|
||||
|
||||
|
||||
static bool CreatedResultsDirectory = false;
|
||||
|
@ -98,6 +99,125 @@ static char * QueryResultFileName(const char *resultId);
|
|||
PG_FUNCTION_INFO_V1(read_intermediate_result);
|
||||
PG_FUNCTION_INFO_V1(broadcast_intermediate_result);
|
||||
PG_FUNCTION_INFO_V1(create_intermediate_result);
|
||||
PG_FUNCTION_INFO_V1(write_intermediate_result_sfunc);
|
||||
PG_FUNCTION_INFO_V1(write_intermediate_result_final);
|
||||
|
||||
|
||||
typedef struct
|
||||
{
|
||||
TupleDesc tupleDescriptor;
|
||||
FmgrInfo *columnOutputFunctions;
|
||||
MemoryContext rowContext;
|
||||
CopyOutState copyOutState;
|
||||
FileCompat fileCompat;
|
||||
Datum *values;
|
||||
bool *nulls;
|
||||
} WriteIntermediateResultsState;
|
||||
|
||||
Datum
|
||||
write_intermediate_result_sfunc(PG_FUNCTION_ARGS)
|
||||
{
|
||||
WriteIntermediateResultsState *state =
|
||||
(WriteIntermediateResultsState *) PG_GETARG_POINTER(0);
|
||||
HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(2);
|
||||
|
||||
if (state == NULL)
|
||||
{
|
||||
MemoryContext agg_context;
|
||||
MemoryContext old_context;
|
||||
|
||||
text *resultId = PG_GETARG_TEXT_P(1);
|
||||
char *resultIdString = text_to_cstring(resultId);
|
||||
|
||||
if (!AggCheckCallContext(fcinfo, &agg_context))
|
||||
{
|
||||
elog(ERROR, "aggregate function called in non-aggregate context");
|
||||
}
|
||||
|
||||
old_context = MemoryContextSwitchTo(agg_context);
|
||||
state = palloc0(sizeof(WriteIntermediateResultsState));
|
||||
|
||||
Oid tupType = HeapTupleHeaderGetTypeId(rec);
|
||||
int32 tupTypmod = HeapTupleHeaderGetTypMod(rec);
|
||||
state->tupleDescriptor = lookup_rowtype_tupdesc(tupType, tupTypmod);
|
||||
|
||||
int natts = state->tupleDescriptor->natts;
|
||||
state->values = palloc(natts * sizeof(Datum));
|
||||
state->nulls = (bool *) palloc(natts * sizeof(bool));
|
||||
|
||||
/* define how tuples will be serialised */
|
||||
const char *delimiterCharacter = "\t";
|
||||
const char *nullPrintCharacter = "\\N";
|
||||
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
||||
copyOutState->delim = (char *) delimiterCharacter;
|
||||
copyOutState->null_print = (char *) nullPrintCharacter;
|
||||
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
||||
copyOutState->binary = false;/*CanUseBinaryCopyFormat(state->tupleDescriptor); */
|
||||
copyOutState->fe_msgbuf = makeStringInfo();
|
||||
copyOutState->rowcontext =
|
||||
AllocSetContextCreate(agg_context, "COPY TO", ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
state->copyOutState = copyOutState;
|
||||
|
||||
/* make sure the directory exists */
|
||||
CreateIntermediateResultsDirectory();
|
||||
|
||||
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
||||
const int fileMode = (S_IRUSR | S_IWUSR);
|
||||
char *filePath = QueryResultFileName(resultIdString);
|
||||
state->fileCompat =
|
||||
FileCompatFromFileStart(FileOpenForTransmit(filePath, fileFlags, fileMode));
|
||||
|
||||
state->columnOutputFunctions = ColumnOutputFunctions(state->tupleDescriptor,
|
||||
copyOutState->binary);
|
||||
|
||||
MemoryContextSwitchTo(old_context);
|
||||
}
|
||||
|
||||
HeapTupleData tuple;
|
||||
tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
|
||||
ItemPointerSetInvalid(&(tuple.t_self));
|
||||
tuple.t_tableOid = InvalidOid;
|
||||
tuple.t_data = rec;
|
||||
|
||||
TupleDesc tupleDesc = state->tupleDescriptor;
|
||||
Datum *values = state->values;
|
||||
bool *nulls = state->nulls;
|
||||
heap_deform_tuple(&tuple, tupleDesc, values, nulls);
|
||||
|
||||
AppendCopyRowData(values, nulls, tupleDesc,
|
||||
state->copyOutState,
|
||||
state->columnOutputFunctions, NULL);
|
||||
|
||||
StringInfo copyData = state->copyOutState->fe_msgbuf;
|
||||
if (copyData->len >= 1024 * 1024)
|
||||
{
|
||||
FileWriteCompat(&state->fileCompat, copyData->data,
|
||||
copyData->len, PG_WAIT_IO);
|
||||
resetStringInfo(copyData);
|
||||
}
|
||||
|
||||
PG_RETURN_POINTER(state);
|
||||
}
|
||||
|
||||
|
||||
Datum
|
||||
write_intermediate_result_final(PG_FUNCTION_ARGS)
|
||||
{
|
||||
WriteIntermediateResultsState *state =
|
||||
(WriteIntermediateResultsState *) PG_GETARG_POINTER(0);
|
||||
ReleaseTupleDesc(state->tupleDescriptor);
|
||||
|
||||
StringInfo copyData = state->copyOutState->fe_msgbuf;
|
||||
if (copyData->len > 0)
|
||||
{
|
||||
FileWriteCompat(&state->fileCompat, copyData->data,
|
||||
copyData->len, PG_WAIT_IO);
|
||||
}
|
||||
|
||||
FileClose(state->fileCompat.fd);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
CREATE FUNCTION write_intermediate_result_sfunc(state INTERNAL, filename TEXT, tuple RECORD)
|
||||
RETURNS INTERNAL
|
||||
LANGUAGE C
|
||||
CALLED ON NULL INPUT
|
||||
AS 'MODULE_PATHNAME', $$write_intermediate_result_sfunc$$;
|
||||
|
||||
CREATE FUNCTION write_intermediate_result_final(state INTERNAL)
|
||||
RETURNS VOID
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$write_intermediate_result_final$$;
|
||||
|
||||
CREATE AGGREGATE write_intermediate_result(filename TEXT, tuple RECORD) (
|
||||
STYPE=INTERNAL,
|
||||
SFUNC=write_intermediate_result_sfunc,
|
||||
FINALFUNC=write_intermediate_result_final
|
||||
);
|
Loading…
Reference in New Issue