Add intermediate results infrastructure

pull/1829/head
Marco Slot 2017-11-24 15:51:20 +01:00
parent bfcc76df69
commit 4cdadfcab6
12 changed files with 868 additions and 4 deletions

View File

@ -13,7 +13,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
6.2-1 6.2-2 6.2-3 6.2-4 \ 6.2-1 6.2-2 6.2-3 6.2-4 \
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13 7.0-14 7.0-15 \ 7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13 7.0-14 7.0-15 \
7.1-1 7.1-2 7.1-3 7.1-4 \ 7.1-1 7.1-2 7.1-3 7.1-4 \
7.2-1 7.2-1 7.2-2
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -181,6 +181,8 @@ $(EXTENSION)--7.1-4.sql: $(EXTENSION)--7.1-3.sql $(EXTENSION)--7.1-3--7.1-4.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--7.2-1.sql: $(EXTENSION)--7.1-4.sql $(EXTENSION)--7.1-4--7.2-1.sql $(EXTENSION)--7.2-1.sql: $(EXTENSION)--7.1-4.sql $(EXTENSION)--7.1-4--7.2-1.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--7.2-2.sql: $(EXTENSION)--7.2-1.sql $(EXTENSION)--7.2-1--7.2-2.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -0,0 +1,24 @@
/* citus--7.2-1--7.2-2 */
CREATE TYPE citus.copy_format AS ENUM ('csv', 'binary', 'text');
CREATE OR REPLACE FUNCTION pg_catalog.read_intermediate_result(result_id text, format citus.copy_format default 'csv')
RETURNS record
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
AS 'MODULE_PATHNAME', $$read_intermediate_result$$;
COMMENT ON FUNCTION pg_catalog.read_intermediate_result(text,citus.copy_format)
IS 'read a file and return it as a set of records';
CREATE OR REPLACE FUNCTION pg_catalog.create_intermediate_result(result_id text, query text)
RETURNS bigint
LANGUAGE C STRICT VOLATILE
AS 'MODULE_PATHNAME', $$create_intermediate_result$$;
COMMENT ON FUNCTION pg_catalog.create_intermediate_result(text,text)
IS 'execute a query and write its results to local result file';
CREATE OR REPLACE FUNCTION pg_catalog.broadcast_intermediate_result(result_id text, query text)
RETURNS bigint
LANGUAGE C STRICT VOLATILE
AS 'MODULE_PATHNAME', $$broadcast_intermediate_result$$;
COMMENT ON FUNCTION pg_catalog.broadcast_intermediate_result(text,text)
IS 'execute a query and write its results to an result file on all workers';

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '7.2-1' default_version = '7.2-2'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -17,6 +17,7 @@
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/transmit.h" #include "distributed/transmit.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "libpq/libpq.h" #include "libpq/libpq.h"
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
@ -24,7 +25,6 @@
/* Local functions forward declarations */ /* Local functions forward declarations */
static File FileOpenForTransmit(const char *filename, int fileFlags, int fileMode);
static void SendCopyInStart(void); static void SendCopyInStart(void);
static void SendCopyOutStart(void); static void SendCopyOutStart(void);
static void SendCopyDone(void); static void SendCopyDone(void);
@ -150,7 +150,7 @@ FreeStringInfo(StringInfo stringInfo)
* the function returns the internal file handle for the opened file. On failure * the function returns the internal file handle for the opened file. On failure
* the function errors out. * the function errors out.
*/ */
static File File
FileOpenForTransmit(const char *filename, int fileFlags, int fileMode) FileOpenForTransmit(const char *filename, int fileFlags, int fileMode)
{ {
File fileDesc = -1; File fileDesc = -1;

View File

@ -0,0 +1,755 @@
/*-------------------------------------------------------------------------
*
* intermediate_results.c
* Functions for writing and reading intermediate results.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include <sys/stat.h>
#include <unistd.h>
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "catalog/pg_enum.h"
#include "commands/copy.h"
#include "distributed/connection_management.h"
#include "distributed/intermediate_results.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_executor.h"
#include "distributed/remote_commands.h"
#include "distributed/transmit.h"
#include "distributed/transaction_identifier.h"
#include "distributed/worker_protocol.h"
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/primnodes.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
static bool CreatedResultsDirectory = false;
/* CopyDestReceiver can be used to stream results into a distributed table */
typedef struct RemoteFileDestReceiver
{
/* public DestReceiver interface */
DestReceiver pub;
char *resultId;
/* descriptor of the tuples that are sent to the worker */
TupleDesc tupleDescriptor;
/* EState for per-tuple memory allocation */
EState *executorState;
/* MemoryContext for DestReceiver session */
MemoryContext memoryContext;
/* worker nodes to send data to */
List *initialNodeList;
List *connectionList;
/* whether to write to a local file */
bool writeLocalFile;
File fileDesc;
/* state on how to copy out data types */
CopyOutState copyOutState;
FmgrInfo *columnOutputFunctions;
/* number of tuples sent */
uint64 tuplesSent;
} RemoteFileDestReceiver;
static RemoteFileDestReceiver * CreateRemoteFileDestReceiver(char *resultId,
EState *executorState,
List *initialNodeList,
bool writeLocalFile);
static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
static StringInfo ConstructCopyResultStatement(const char *resultId);
static void WriteToLocalFile(StringInfo copyData, File fileDesc);
static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
static void BroadcastCopyData(StringInfo dataBuffer, List *connectionList);
static void SendCopyDataOverConnection(StringInfo dataBuffer,
MultiConnection *connection);
static void RemoteFileDestReceiverShutdown(DestReceiver *destReceiver);
static void RemoteFileDestReceiverDestroy(DestReceiver *destReceiver);
static char * CreateIntermediateResultsDirectory(void);
static char * IntermediateResultsDirectory(void);
static char * QueryResultFileName(const char *resultId);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(read_intermediate_result);
PG_FUNCTION_INFO_V1(broadcast_intermediate_result);
PG_FUNCTION_INFO_V1(create_intermediate_result);
/*
* broadcast_intermediate_result executes a query and streams the results
* into a file on all workers.
*/
Datum
broadcast_intermediate_result(PG_FUNCTION_ARGS)
{
text *resultIdText = PG_GETARG_TEXT_P(0);
char *resultIdString = text_to_cstring(resultIdText);
text *queryText = PG_GETARG_TEXT_P(1);
char *queryString = text_to_cstring(queryText);
EState *estate = NULL;
List *nodeList = NIL;
bool writeLocalFile = false;
RemoteFileDestReceiver *resultDest = NULL;
ParamListInfo paramListInfo = NULL;
CheckCitusVersion(ERROR);
nodeList = ActivePrimaryNodeList();
estate = CreateExecutorState();
resultDest = CreateRemoteFileDestReceiver(resultIdString, estate, nodeList,
writeLocalFile);
ExecuteQueryStringIntoDestReceiver(queryString, paramListInfo,
(DestReceiver *) resultDest);
FreeExecutorState(estate);
PG_RETURN_INT64(resultDest->tuplesSent);
}
/*
* create_intermediate_result executes a query and writes the results
* into a local file.
*/
Datum
create_intermediate_result(PG_FUNCTION_ARGS)
{
text *resultIdText = PG_GETARG_TEXT_P(0);
char *resultIdString = text_to_cstring(resultIdText);
text *queryText = PG_GETARG_TEXT_P(1);
char *queryString = text_to_cstring(queryText);
EState *estate = NULL;
List *nodeList = NIL;
bool writeLocalFile = true;
RemoteFileDestReceiver *resultDest = NULL;
ParamListInfo paramListInfo = NULL;
CheckCitusVersion(ERROR);
estate = CreateExecutorState();
resultDest = CreateRemoteFileDestReceiver(resultIdString, estate, nodeList,
writeLocalFile);
ExecuteQueryStringIntoDestReceiver(queryString, paramListInfo,
(DestReceiver *) resultDest);
FreeExecutorState(estate);
PG_RETURN_INT64(resultDest->tuplesSent);
}
/*
* CreateRemoteFileDestReceiver creates a DestReceiver that streams results
* to a set of worker nodes.
*/
static RemoteFileDestReceiver *
CreateRemoteFileDestReceiver(char *resultId, EState *executorState,
List *initialNodeList, bool writeLocalFile)
{
RemoteFileDestReceiver *resultDest = NULL;
resultDest = (RemoteFileDestReceiver *) palloc0(sizeof(RemoteFileDestReceiver));
/* set up the DestReceiver function pointers */
resultDest->pub.receiveSlot = RemoteFileDestReceiverReceive;
resultDest->pub.rStartup = RemoteFileDestReceiverStartup;
resultDest->pub.rShutdown = RemoteFileDestReceiverShutdown;
resultDest->pub.rDestroy = RemoteFileDestReceiverDestroy;
resultDest->pub.mydest = DestCopyOut;
/* set up output parameters */
resultDest->resultId = resultId;
resultDest->executorState = executorState;
resultDest->initialNodeList = initialNodeList;
resultDest->memoryContext = CurrentMemoryContext;
resultDest->writeLocalFile = writeLocalFile;
return resultDest;
}
/*
* RemoteFileDestReceiverStartup implements the rStartup interface of
* RemoteFileDestReceiver. It opens the relation
*/
static void
RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor)
{
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest;
const char *resultId = resultDest->resultId;
CopyOutState copyOutState = NULL;
const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N";
List *initialNodeList = resultDest->initialNodeList;
ListCell *initialNodeCell = NULL;
List *connectionList = NIL;
ListCell *connectionCell = NULL;
resultDest->tupleDescriptor = inputTupleDescriptor;
/* define how tuples will be serialised */
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->delim = (char *) delimiterCharacter;
copyOutState->null_print = (char *) nullPrintCharacter;
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor);
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = GetPerTupleMemoryContext(resultDest->executorState);
resultDest->copyOutState = copyOutState;
resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary);
/*
* Make sure that this transaction has a distributed transaction ID.
*
* Intermediate results will be stored in a directory that is derived from
* the distributed transaction ID across all workers and on the coordinator
* itself. Even if we only store results locally, we still want to assign
* a transaction ID in case we later store results on workers.
*
* When we start using broadcast_intermediate_result from workers, we
* need to make sure that we don't override the transaction ID here.
*/
BeginOrContinueCoordinatedTransaction();
if (resultDest->writeLocalFile)
{
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
const int fileMode = (S_IRUSR | S_IWUSR);
const char *fileName = NULL;
/* make sure the directory exists */
CreateIntermediateResultsDirectory();
fileName = QueryResultFileName(resultId);
elog(DEBUG1, "writing to local file \"%s\"", fileName);
resultDest->fileDesc = FileOpenForTransmit(fileName, fileFlags, fileMode);
}
foreach(initialNodeCell, initialNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(initialNodeCell);
int connectionFlags = 0;
char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
MultiConnection *connection = NULL;
connection = StartNodeConnection(connectionFlags, nodeName, nodePort);
ClaimConnectionExclusively(connection);
MarkRemoteTransactionCritical(connection);
connectionList = lappend(connectionList, connection);
}
FinishConnectionListEstablishment(connectionList);
/* must open transaction blocks to use intermediate results */
RemoteTransactionsBeginIfNecessary(connectionList);
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
StringInfo copyCommand = NULL;
bool querySent = false;
copyCommand = ConstructCopyResultStatement(resultId);
querySent = SendRemoteCommand(connection, copyCommand->data);
if (!querySent)
{
ReportConnectionError(connection, ERROR);
}
}
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (PQresultStatus(result) != PGRES_COPY_IN)
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
}
if (copyOutState->binary)
{
/* send headers when using binary encoding */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryHeaders(copyOutState);
BroadcastCopyData(copyOutState->fe_msgbuf, connectionList);
if (resultDest->writeLocalFile)
{
WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc);
}
}
resultDest->connectionList = connectionList;
}
/*
* ConstructCopyResultStatement constructs the text of a COPY statement
* for copying into a result file.
*/
static StringInfo
ConstructCopyResultStatement(const char *resultId)
{
StringInfo command = makeStringInfo();
appendStringInfo(command, "COPY \"%s\" FROM STDIN WITH (format result)",
resultId);
return command;
}
/*
* RemoteFileDestReceiverReceive implements the receiveSlot function of
* RemoteFileDestReceiver. It takes a TupleTableSlot and sends the contents to
* all worker nodes.
*/
static bool
RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest;
TupleDesc tupleDescriptor = resultDest->tupleDescriptor;
List *connectionList = resultDest->connectionList;
CopyOutState copyOutState = resultDest->copyOutState;
FmgrInfo *columnOutputFunctions = resultDest->columnOutputFunctions;
Datum *columnValues = NULL;
bool *columnNulls = NULL;
StringInfo copyData = copyOutState->fe_msgbuf;
EState *executorState = resultDest->executorState;
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
slot_getallattrs(slot);
columnValues = slot->tts_values;
columnNulls = slot->tts_isnull;
resetStringInfo(copyData);
/* construct row in COPY format */
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions, NULL);
/* send row to nodes */
BroadcastCopyData(copyData, connectionList);
/* write to local file (if applicable) */
if (resultDest->writeLocalFile)
{
WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc);
}
MemoryContextSwitchTo(oldContext);
resultDest->tuplesSent++;
ResetPerTupleExprContext(executorState);
return true;
}
/*
* WriteToLocalResultsFile writes the bytes in a StringInfo to a local file.
*/
static void
WriteToLocalFile(StringInfo copyData, File fileDesc)
{
#if (PG_VERSION_NUM >= 100000)
int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO);
#else
int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len);
#endif
if (bytesWritten < 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not append to file: %m")));
}
}
/*
* RemoteFileDestReceiverShutdown implements the rShutdown interface of
* RemoteFileDestReceiver. It ends the COPY on all the open connections and closes
* the relation.
*/
static void
RemoteFileDestReceiverShutdown(DestReceiver *destReceiver)
{
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver;
List *connectionList = resultDest->connectionList;
CopyOutState copyOutState = resultDest->copyOutState;
if (copyOutState->binary)
{
/* send footers when using binary encoding */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryFooters(copyOutState);
BroadcastCopyData(copyOutState->fe_msgbuf, connectionList);
if (resultDest->writeLocalFile)
{
WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc);
}
}
/* close the COPY input */
EndRemoteCopy(0, connectionList, true);
if (resultDest->writeLocalFile)
{
FileClose(resultDest->fileDesc);
}
}
/*
* BroadcastCopyData sends copy data to all connections in a list.
*/
static void
BroadcastCopyData(StringInfo dataBuffer, List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
SendCopyDataOverConnection(dataBuffer, connection);
}
}
/*
* SendCopyDataOverConnection sends serialized COPY data over the given
* connection.
*/
static void
SendCopyDataOverConnection(StringInfo dataBuffer, MultiConnection *connection)
{
if (!PutRemoteCopyData(connection, dataBuffer->data, dataBuffer->len))
{
ReportConnectionError(connection, ERROR);
}
}
/*
* RemoteFileDestReceiverDestroy frees memory allocated as part of the
* RemoteFileDestReceiver and closes file descriptors.
*/
static void
RemoteFileDestReceiverDestroy(DestReceiver *destReceiver)
{
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver;
if (resultDest->copyOutState)
{
pfree(resultDest->copyOutState);
}
if (resultDest->columnOutputFunctions)
{
pfree(resultDest->columnOutputFunctions);
}
pfree(resultDest);
}
/*
* ReceiveQueryResultViaCopy is called when a COPY "resultid" FROM
* STDIN WITH (format result) command is received from the client.
* The command is followed by the raw copy data stream, which is
* redirected to a file.
*
* File names are automatically prefixed with the user OID. Users
* are only allowed to read query results from their own directory.
*/
void
ReceiveQueryResultViaCopy(const char *resultId)
{
const char *resultFileName = NULL;
CreateIntermediateResultsDirectory();
resultFileName = QueryResultFileName(resultId);
RedirectCopyDataToRegularFile(resultFileName);
}
/*
* CreateIntermediateResultsDirectory creates the intermediate result
* directory for the current transaction if it does not exist and ensures
* that the directory is removed at the end of the transaction.
*/
static char *
CreateIntermediateResultsDirectory(void)
{
char *resultDirectory = IntermediateResultsDirectory();
int makeOK = 0;
if (!CreatedResultsDirectory)
{
makeOK = mkdir(resultDirectory, S_IRWXU);
if (makeOK != 0 && errno != EEXIST)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not create intermediate results directory "
"\"%s\": %m",
resultDirectory)));
}
CreatedResultsDirectory = true;
}
return resultDirectory;
}
/*
* QueryResultFileName returns the file name in which to store
* an intermediate result with the given key in the per transaction
* result directory.
*/
static char *
QueryResultFileName(const char *resultId)
{
StringInfo resultFileName = makeStringInfo();
const char *resultDirectory = IntermediateResultsDirectory();
char *checkChar = (char *) resultId;
for (; *checkChar; checkChar++)
{
if (!((*checkChar >= 'a' && *checkChar <= 'z') ||
(*checkChar >= 'A' && *checkChar <= 'Z') ||
(*checkChar >= '0' && *checkChar <= '9') ||
(*checkChar == '_') || (*checkChar == '-')))
{
ereport(ERROR, (errcode(ERRCODE_INVALID_NAME),
errmsg("result key \"%s\" contains invalid character",
resultId),
errhint("Result keys may only contain letters, numbers, "
"underscores and hyphens.")));
}
}
appendStringInfo(resultFileName, "%s/%s.data",
resultDirectory, resultId);
return resultFileName->data;
}
/*
* IntermediateResultsDirectory returns the directory to use for a query result
* file with a particular key. The filename includes the user OID, such
* that users can never read each other's files.
*
* In a distributed transaction, the directory has the form:
* base/pgsql_job_cache/<user id>_<coordinator node id>_<transaction number>/
*
* In a non-distributed transaction, the directory has the form:
* base/pgsql_job_cache/<user id>_<process id>/
*
* The latter form can be used for testing COPY ... WITH (format result) without
* assigning a distributed transaction ID.
*
* The pgsql_job_cache directory is emptied on restart in case of failure.
*/
static char *
IntermediateResultsDirectory(void)
{
StringInfo resultFileName = makeStringInfo();
Oid userId = GetUserId();
DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId();
int initiatorNodeIdentifier = transactionId->initiatorNodeIdentifier;
uint64 transactionNumber = transactionId->transactionNumber;
if (transactionNumber > 0)
{
appendStringInfo(resultFileName, "base/" PG_JOB_CACHE_DIR "/%u_%u_%lu",
userId, initiatorNodeIdentifier, transactionNumber);
}
else
{
appendStringInfo(resultFileName, "base/" PG_JOB_CACHE_DIR "/%u_%u",
userId, MyProcPid);
}
return resultFileName->data;
}
/*
* RemoveIntermediateResultsDirectory removes the intermediate result directory
* for the current distributed transaction, if any was created.
*/
void
RemoveIntermediateResultsDirectory(void)
{
if (CreatedResultsDirectory)
{
StringInfo resultsDirectory = makeStringInfo();
appendStringInfoString(resultsDirectory, IntermediateResultsDirectory());
CitusRemoveDirectory(resultsDirectory);
CreatedResultsDirectory = false;
}
}
/*
* read_intermediate_result is a UDF that returns a COPY-formatted intermediate
* result file as a set of records. The file is parsed according to the columns
* definition list specified by the user, e.g.:
*
* SELECT * FROM read_intermediate_result('foo', 'csv') AS (a int, b int)
*
* The file is read from the directory returned by IntermediateResultsDirectory,
* which includes the user ID.
*
* read_intermediate_result is a volatile function because it cannot be
* evaluated until execution time, but for distributed planning purposes we can
* treat it in the same way as immutable functions and reference tables, since
* we know it will return the same result on all nodes.
*/
Datum
read_intermediate_result(PG_FUNCTION_ARGS)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
text *resultIdText = PG_GETARG_TEXT_P(0);
char *resultIdString = text_to_cstring(resultIdText);
Datum copyFormatOidDatum = PG_GETARG_DATUM(1);
Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum);
char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum);
char *resultFileName = NULL;
struct stat fileStat;
int statOK = 0;
Tuplestorestate *tupstore = NULL;
TupleDesc tupleDescriptor = NULL;
MemoryContext oldcontext = NULL;
CheckCitusVersion(ERROR);
resultFileName = QueryResultFileName(resultIdString);
statOK = stat(resultFileName, &fileStat);
if (statOK != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("result \"%s\" does not exist", resultIdString)));
}
/* check to see if query supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"set-valued function called in context that cannot accept a set")));
}
if (!(rsinfo->allowedModes & SFRM_Materialize))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"materialize mode required, but it is not allowed in this context")));
}
/* get a tuple descriptor for our result type */
switch (get_call_result_type(fcinfo, NULL, &tupleDescriptor))
{
case TYPEFUNC_COMPOSITE:
{
/* success */
break;
}
case TYPEFUNC_RECORD:
{
/* failed to determine actual type of RECORD */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
break;
}
default:
{
/* result type isn't composite */
elog(ERROR, "return type must be a row type");
break;
}
}
tupleDescriptor = CreateTupleDescCopy(tupleDescriptor);
oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupleDescriptor;
MemoryContextSwitchTo(oldcontext);
ReadFileIntoTupleStore(resultFileName, copyFormatLabel, tupleDescriptor, tupstore);
tuplestore_donestoring(tupstore);
return (Datum) 0;
}

View File

@ -123,6 +123,8 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
ReadFileIntoTupleStore(taskFilename->data, copyFormat, tupleDescriptor, ReadFileIntoTupleStore(taskFilename->data, copyFormat, tupleDescriptor,
citusScanState->tuplestorestate); citusScanState->tuplestorestate);
} }
tuplestore_donestoring(citusScanState->tuplestorestate);
} }

View File

@ -37,6 +37,7 @@
#include "commands/prepare.h" #include "commands/prepare.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/intermediate_results.h"
#include "distributed/maintenanced.h" #include "distributed/maintenanced.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
@ -108,6 +109,7 @@ static bool IsCitusExtensionStmt(Node *parsetree);
/* Local functions forward declarations for Transmit statement */ /* Local functions forward declarations for Transmit statement */
static bool IsTransmitStmt(Node *parsetree); static bool IsTransmitStmt(Node *parsetree);
static void VerifyTransmitStmt(CopyStmt *copyStatement); static void VerifyTransmitStmt(CopyStmt *copyStatement);
static bool IsCopyResultStmt(CopyStmt *copyStatement);
/* Local functions forward declarations for processing distributed table commands */ /* Local functions forward declarations for processing distributed table commands */
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
@ -706,6 +708,34 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
} }
/*
* IsCopyResultStmt determines whether the given copy statement is a
* COPY "resultkey" FROM STDIN WITH (format result) statement, which is used
* to copy query results from the coordinator into workers.
*/
static bool
IsCopyResultStmt(CopyStmt *copyStatement)
{
ListCell *optionCell = NULL;
bool hasFormatReceive = false;
/* extract WITH (...) options from the COPY statement */
foreach(optionCell, copyStatement->options)
{
DefElem *defel = (DefElem *) lfirst(optionCell);
if (strncmp(defel->defname, "format", NAMEDATALEN) == 0 &&
strncmp(defGetString(defel), "result", NAMEDATALEN) == 0)
{
hasFormatReceive = true;
break;
}
}
return hasFormatReceive;
}
/* /*
* ProcessCopyStmt handles Citus specific concerns for COPY like supporting * ProcessCopyStmt handles Citus specific concerns for COPY like supporting
* COPYing from distributed tables and preventing unsupported actions. The * COPYing from distributed tables and preventing unsupported actions. The
@ -723,6 +753,19 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
{ {
*commandMustRunAsOwner = false; /* make sure variable is initialized */ *commandMustRunAsOwner = false; /* make sure variable is initialized */
/*
* Handle special COPY "resultid" FROM STDIN WITH (format result) commands
* for sending intermediate results to workers.
*/
if (IsCopyResultStmt(copyStatement))
{
const char *resultId = copyStatement->relation->relname;
ReceiveQueryResultViaCopy(resultId);
return NULL;
}
/* /*
* We check whether a distributed relation is affected. For that, we need to open the * We check whether a distributed relation is affected. For that, we need to open the
* relation. To prevent race conditions with later lookups, lock the table, and modify * relation. To prevent race conditions with later lookups, lock the table, and modify

View File

@ -22,6 +22,7 @@
#include "distributed/backend_data.h" #include "distributed/backend_data.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/intermediate_results.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
@ -154,6 +155,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* transaction management. Do so before doing other work, so the * transaction management. Do so before doing other work, so the
* callbacks still can perform work if needed. * callbacks still can perform work if needed.
*/ */
RemoveIntermediateResultsDirectory();
ResetShardPlacementTransactionState(); ResetShardPlacementTransactionState();
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
@ -191,6 +193,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* transaction management. Do so before doing other work, so the * transaction management. Do so before doing other work, so the
* callbacks still can perform work if needed. * callbacks still can perform work if needed.
*/ */
RemoveIntermediateResultsDirectory();
ResetShardPlacementTransactionState(); ResetShardPlacementTransactionState();
/* handles both already prepared and open transactions */ /* handles both already prepared and open transactions */

View File

@ -0,0 +1,29 @@
/*-------------------------------------------------------------------------
*
* intermediate_results.h
* Functions for writing and reading intermediate results.
*
* Copyright (c) 2017, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef INTERMEDIATE_RESULTS_H
#define INTERMEDIATE_RESULTS_H
#include "fmgr.h"
#include "distributed/multi_copy.h"
#include "nodes/execnodes.h"
#include "nodes/execnodes.h"
#include "nodes/pg_list.h"
#include "tcop/dest.h"
#include "utils/palloc.h"
extern void ReceiveQueryResultViaCopy(const char *resultId);
extern void RemoveIntermediateResultsDirectory(void);
#endif /* INTERMEDIATE_RESULTS_H */

View File

@ -12,11 +12,13 @@
#define TRANSMIT_H #define TRANSMIT_H
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "storage/fd.h"
/* Function declarations for transmitting files between two nodes */ /* Function declarations for transmitting files between two nodes */
extern void RedirectCopyDataToRegularFile(const char *filename); extern void RedirectCopyDataToRegularFile(const char *filename);
extern void SendRegularFile(const char *filename); extern void SendRegularFile(const char *filename);
extern File FileOpenForTransmit(const char *filename, int fileFlags, int fileMode);
/* Function declaration local to commands and worker modules */ /* Function declaration local to commands and worker modules */
extern void FreeStringInfo(StringInfo stringInfo); extern void FreeStringInfo(StringInfo stringInfo);

View File

@ -129,6 +129,8 @@ ALTER EXTENSION citus UPDATE TO '7.1-1';
ALTER EXTENSION citus UPDATE TO '7.1-2'; ALTER EXTENSION citus UPDATE TO '7.1-2';
ALTER EXTENSION citus UPDATE TO '7.1-3'; ALTER EXTENSION citus UPDATE TO '7.1-3';
ALTER EXTENSION citus UPDATE TO '7.1-4'; ALTER EXTENSION citus UPDATE TO '7.1-4';
ALTER EXTENSION citus UPDATE TO '7.2-1';
ALTER EXTENSION citus UPDATE TO '7.2-2';
-- show running version -- show running version
SHOW citus.version; SHOW citus.version;
citus.version citus.version

View File

@ -129,6 +129,8 @@ ALTER EXTENSION citus UPDATE TO '7.1-1';
ALTER EXTENSION citus UPDATE TO '7.1-2'; ALTER EXTENSION citus UPDATE TO '7.1-2';
ALTER EXTENSION citus UPDATE TO '7.1-3'; ALTER EXTENSION citus UPDATE TO '7.1-3';
ALTER EXTENSION citus UPDATE TO '7.1-4'; ALTER EXTENSION citus UPDATE TO '7.1-4';
ALTER EXTENSION citus UPDATE TO '7.2-1';
ALTER EXTENSION citus UPDATE TO '7.2-2';
-- show running version -- show running version
SHOW citus.version; SHOW citus.version;