diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 9589cce4e..65d82409e 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -13,7 +13,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 6.2-1 6.2-2 6.2-3 6.2-4 \ 7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13 7.0-14 7.0-15 \ 7.1-1 7.1-2 7.1-3 7.1-4 \ - 7.2-1 + 7.2-1 7.2-2 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -181,6 +181,8 @@ $(EXTENSION)--7.1-4.sql: $(EXTENSION)--7.1-3.sql $(EXTENSION)--7.1-3--7.1-4.sql cat $^ > $@ $(EXTENSION)--7.2-1.sql: $(EXTENSION)--7.1-4.sql $(EXTENSION)--7.1-4--7.2-1.sql cat $^ > $@ +$(EXTENSION)--7.2-2.sql: $(EXTENSION)--7.2-1.sql $(EXTENSION)--7.2-1--7.2-2.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--7.2-1--7.2-2.sql b/src/backend/distributed/citus--7.2-1--7.2-2.sql new file mode 100644 index 000000000..31030dcb7 --- /dev/null +++ b/src/backend/distributed/citus--7.2-1--7.2-2.sql @@ -0,0 +1,24 @@ +/* citus--7.2-1--7.2-2 */ + +CREATE TYPE citus.copy_format AS ENUM ('csv', 'binary', 'text'); + +CREATE OR REPLACE FUNCTION pg_catalog.read_intermediate_result(result_id text, format citus.copy_format default 'csv') + RETURNS record + LANGUAGE C STRICT VOLATILE PARALLEL SAFE + AS 'MODULE_PATHNAME', $$read_intermediate_result$$; +COMMENT ON FUNCTION pg_catalog.read_intermediate_result(text,citus.copy_format) + IS 'read a file and return it as a set of records'; + +CREATE OR REPLACE FUNCTION pg_catalog.create_intermediate_result(result_id text, query text) + RETURNS bigint + LANGUAGE C STRICT VOLATILE + AS 'MODULE_PATHNAME', $$create_intermediate_result$$; +COMMENT ON FUNCTION pg_catalog.create_intermediate_result(text,text) + IS 'execute a query and write its results to local result file'; + +CREATE OR REPLACE FUNCTION pg_catalog.broadcast_intermediate_result(result_id text, query text) + RETURNS bigint + LANGUAGE C STRICT VOLATILE + AS 'MODULE_PATHNAME', $$broadcast_intermediate_result$$; +COMMENT ON FUNCTION pg_catalog.broadcast_intermediate_result(text,text) + IS 'execute a query and write its results to an result file on all workers'; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index b53a68b63..da3e9c553 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.2-1' +default_version = '7.2-2' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 8ff28ce08..11c6a4eee 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -97,22 +97,18 @@ static void OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, bool stopOnFailure, bool useBinaryCopyFormat); -static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription); static bool BinaryOutputFunctionDefined(Oid typeId); static List * MasterShardPlacementList(uint64 shardId); static List * RemoteFinalizedShardPlacementList(uint64 shardId); - static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId, List *connectionList); static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, List *connectionList); - static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCopyFormat); static void SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList); static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *connection); -static void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure); static void ReportCopyError(MultiConnection *connection, PGresult *result); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); static int64 StartCopyToNewShard(ShardConnections *shardConnections, @@ -904,7 +900,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections, * worker nodes for user-defined types. If the function can not detect a binary * output function for any of the column, it returns false. */ -static bool +bool CanUseBinaryCopyFormat(TupleDesc tupleDescription) { bool useBinaryCopyFormat = true; @@ -1169,7 +1165,7 @@ SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *c * If stopOnFailure is true, then EndRemoteCopy reports an error on failure, * otherwise it reports a warning or continues. */ -static void +void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure) { ListCell *connectionCell = NULL; diff --git a/src/backend/distributed/commands/transmit.c b/src/backend/distributed/commands/transmit.c index 562405269..c5dcb667b 100644 --- a/src/backend/distributed/commands/transmit.c +++ b/src/backend/distributed/commands/transmit.c @@ -17,6 +17,7 @@ #include "distributed/relay_utility.h" #include "distributed/transmit.h" +#include "distributed/worker_protocol.h" #include "distributed/version_compat.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -24,7 +25,6 @@ /* Local functions forward declarations */ -static File FileOpenForTransmit(const char *filename, int fileFlags, int fileMode); static void SendCopyInStart(void); static void SendCopyOutStart(void); static void SendCopyDone(void); @@ -150,7 +150,7 @@ FreeStringInfo(StringInfo stringInfo) * the function returns the internal file handle for the opened file. On failure * the function errors out. */ -static File +File FileOpenForTransmit(const char *filename, int fileFlags, int fileMode) { File fileDesc = -1; diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index b71082747..9e55680e4 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -36,8 +36,6 @@ static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, Query *selectQuery, EState *executorState); -static void ExecuteIntoDestReceiver(Query *query, ParamListInfo params, - DestReceiver *dest); /* @@ -137,50 +135,9 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, partitionColumnIndex, executorState, stopOnFailure); - ExecuteIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest); + ExecuteQueryIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest); executorState->es_processed = copyDest->tuplesSent; XactModificationLevel = XACT_MODIFICATION_DATA; } - - -/* - * ExecuteIntoDestReceiver plans and executes a query and sends results to the given - * DestReceiver. - */ -static void -ExecuteIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest) -{ - PlannedStmt *queryPlan = NULL; - Portal portal = NULL; - int eflags = 0; - int cursorOptions = 0; - long count = FETCH_ALL; - - /* create a new portal for executing the query */ - portal = CreateNewPortal(); - - /* don't display the portal in pg_cursors, it is for internal use only */ - portal->visible = false; - - cursorOptions = CURSOR_OPT_PARALLEL_OK; - - /* plan the subquery, this may be another distributed query */ - queryPlan = pg_plan_query(query, cursorOptions, params); - - PortalDefineQuery(portal, - NULL, - "", - "SELECT", - list_make1(queryPlan), - NULL); - - PortalStart(portal, params, eflags, GetActiveSnapshot()); -#if (PG_VERSION_NUM >= 100000) - PortalRun(portal, count, false, true, dest, dest, NULL); -#else - PortalRun(portal, count, false, dest, dest, NULL); -#endif - PortalDrop(portal, false); -} diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c new file mode 100644 index 000000000..54acd5918 --- /dev/null +++ b/src/backend/distributed/executor/intermediate_results.c @@ -0,0 +1,755 @@ +/*------------------------------------------------------------------------- + * + * intermediate_results.c + * Functions for writing and reading intermediate results. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include +#include + +#include "postgres.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" +#include "pgstat.h" + +#include "catalog/pg_enum.h" +#include "commands/copy.h" +#include "distributed/connection_management.h" +#include "distributed/intermediate_results.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_copy.h" +#include "distributed/multi_executor.h" +#include "distributed/remote_commands.h" +#include "distributed/transmit.h" +#include "distributed/transaction_identifier.h" +#include "distributed/worker_protocol.h" +#include "nodes/makefuncs.h" +#include "nodes/parsenodes.h" +#include "nodes/primnodes.h" +#include "storage/fd.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/syscache.h" + + +static bool CreatedResultsDirectory = false; + + +/* CopyDestReceiver can be used to stream results into a distributed table */ +typedef struct RemoteFileDestReceiver +{ + /* public DestReceiver interface */ + DestReceiver pub; + + char *resultId; + + /* descriptor of the tuples that are sent to the worker */ + TupleDesc tupleDescriptor; + + /* EState for per-tuple memory allocation */ + EState *executorState; + + /* MemoryContext for DestReceiver session */ + MemoryContext memoryContext; + + /* worker nodes to send data to */ + List *initialNodeList; + List *connectionList; + + /* whether to write to a local file */ + bool writeLocalFile; + File fileDesc; + + /* state on how to copy out data types */ + CopyOutState copyOutState; + FmgrInfo *columnOutputFunctions; + + /* number of tuples sent */ + uint64 tuplesSent; +} RemoteFileDestReceiver; + + +static RemoteFileDestReceiver * CreateRemoteFileDestReceiver(char *resultId, + EState *executorState, + List *initialNodeList, + bool writeLocalFile); +static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor); +static StringInfo ConstructCopyResultStatement(const char *resultId); +static void WriteToLocalFile(StringInfo copyData, File fileDesc); +static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); +static void BroadcastCopyData(StringInfo dataBuffer, List *connectionList); +static void SendCopyDataOverConnection(StringInfo dataBuffer, + MultiConnection *connection); +static void RemoteFileDestReceiverShutdown(DestReceiver *destReceiver); +static void RemoteFileDestReceiverDestroy(DestReceiver *destReceiver); + +static char * CreateIntermediateResultsDirectory(void); +static char * IntermediateResultsDirectory(void); +static char * QueryResultFileName(const char *resultId); + + +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(read_intermediate_result); +PG_FUNCTION_INFO_V1(broadcast_intermediate_result); +PG_FUNCTION_INFO_V1(create_intermediate_result); + + +/* + * broadcast_intermediate_result executes a query and streams the results + * into a file on all workers. + */ +Datum +broadcast_intermediate_result(PG_FUNCTION_ARGS) +{ + text *resultIdText = PG_GETARG_TEXT_P(0); + char *resultIdString = text_to_cstring(resultIdText); + text *queryText = PG_GETARG_TEXT_P(1); + char *queryString = text_to_cstring(queryText); + EState *estate = NULL; + List *nodeList = NIL; + bool writeLocalFile = false; + RemoteFileDestReceiver *resultDest = NULL; + ParamListInfo paramListInfo = NULL; + + CheckCitusVersion(ERROR); + + nodeList = ActivePrimaryNodeList(); + estate = CreateExecutorState(); + resultDest = CreateRemoteFileDestReceiver(resultIdString, estate, nodeList, + writeLocalFile); + + ExecuteQueryStringIntoDestReceiver(queryString, paramListInfo, + (DestReceiver *) resultDest); + + FreeExecutorState(estate); + + PG_RETURN_INT64(resultDest->tuplesSent); +} + + +/* + * create_intermediate_result executes a query and writes the results + * into a local file. + */ +Datum +create_intermediate_result(PG_FUNCTION_ARGS) +{ + text *resultIdText = PG_GETARG_TEXT_P(0); + char *resultIdString = text_to_cstring(resultIdText); + text *queryText = PG_GETARG_TEXT_P(1); + char *queryString = text_to_cstring(queryText); + EState *estate = NULL; + List *nodeList = NIL; + bool writeLocalFile = true; + RemoteFileDestReceiver *resultDest = NULL; + ParamListInfo paramListInfo = NULL; + + CheckCitusVersion(ERROR); + + estate = CreateExecutorState(); + resultDest = CreateRemoteFileDestReceiver(resultIdString, estate, nodeList, + writeLocalFile); + + ExecuteQueryStringIntoDestReceiver(queryString, paramListInfo, + (DestReceiver *) resultDest); + + FreeExecutorState(estate); + + PG_RETURN_INT64(resultDest->tuplesSent); +} + + +/* + * CreateRemoteFileDestReceiver creates a DestReceiver that streams results + * to a set of worker nodes. + */ +static RemoteFileDestReceiver * +CreateRemoteFileDestReceiver(char *resultId, EState *executorState, + List *initialNodeList, bool writeLocalFile) +{ + RemoteFileDestReceiver *resultDest = NULL; + + resultDest = (RemoteFileDestReceiver *) palloc0(sizeof(RemoteFileDestReceiver)); + + /* set up the DestReceiver function pointers */ + resultDest->pub.receiveSlot = RemoteFileDestReceiverReceive; + resultDest->pub.rStartup = RemoteFileDestReceiverStartup; + resultDest->pub.rShutdown = RemoteFileDestReceiverShutdown; + resultDest->pub.rDestroy = RemoteFileDestReceiverDestroy; + resultDest->pub.mydest = DestCopyOut; + + /* set up output parameters */ + resultDest->resultId = resultId; + resultDest->executorState = executorState; + resultDest->initialNodeList = initialNodeList; + resultDest->memoryContext = CurrentMemoryContext; + resultDest->writeLocalFile = writeLocalFile; + + return resultDest; +} + + +/* + * RemoteFileDestReceiverStartup implements the rStartup interface of + * RemoteFileDestReceiver. It opens the relation + */ +static void +RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor) +{ + RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest; + + const char *resultId = resultDest->resultId; + + CopyOutState copyOutState = NULL; + const char *delimiterCharacter = "\t"; + const char *nullPrintCharacter = "\\N"; + + List *initialNodeList = resultDest->initialNodeList; + ListCell *initialNodeCell = NULL; + List *connectionList = NIL; + ListCell *connectionCell = NULL; + + resultDest->tupleDescriptor = inputTupleDescriptor; + + /* define how tuples will be serialised */ + copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); + copyOutState->delim = (char *) delimiterCharacter; + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor); + copyOutState->fe_msgbuf = makeStringInfo(); + copyOutState->rowcontext = GetPerTupleMemoryContext(resultDest->executorState); + resultDest->copyOutState = copyOutState; + + resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, + copyOutState->binary); + + /* + * Make sure that this transaction has a distributed transaction ID. + * + * Intermediate results will be stored in a directory that is derived from + * the distributed transaction ID across all workers and on the coordinator + * itself. Even if we only store results locally, we still want to assign + * a transaction ID in case we later store results on workers. + * + * When we start using broadcast_intermediate_result from workers, we + * need to make sure that we don't override the transaction ID here. + */ + BeginOrContinueCoordinatedTransaction(); + + if (resultDest->writeLocalFile) + { + const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY); + const int fileMode = (S_IRUSR | S_IWUSR); + const char *fileName = NULL; + + /* make sure the directory exists */ + CreateIntermediateResultsDirectory(); + + fileName = QueryResultFileName(resultId); + + elog(DEBUG1, "writing to local file \"%s\"", fileName); + + resultDest->fileDesc = FileOpenForTransmit(fileName, fileFlags, fileMode); + } + + foreach(initialNodeCell, initialNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(initialNodeCell); + int connectionFlags = 0; + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + MultiConnection *connection = NULL; + + connection = StartNodeConnection(connectionFlags, nodeName, nodePort); + ClaimConnectionExclusively(connection); + MarkRemoteTransactionCritical(connection); + + connectionList = lappend(connectionList, connection); + } + + FinishConnectionListEstablishment(connectionList); + + /* must open transaction blocks to use intermediate results */ + RemoteTransactionsBeginIfNecessary(connectionList); + + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + StringInfo copyCommand = NULL; + bool querySent = false; + + copyCommand = ConstructCopyResultStatement(resultId); + + querySent = SendRemoteCommand(connection, copyCommand->data); + if (!querySent) + { + ReportConnectionError(connection, ERROR); + } + } + + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + bool raiseInterrupts = true; + + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); + if (PQresultStatus(result) != PGRES_COPY_IN) + { + ReportResultError(connection, result, ERROR); + } + + PQclear(result); + } + + if (copyOutState->binary) + { + /* send headers when using binary encoding */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyBinaryHeaders(copyOutState); + BroadcastCopyData(copyOutState->fe_msgbuf, connectionList); + + if (resultDest->writeLocalFile) + { + WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc); + } + } + + resultDest->connectionList = connectionList; +} + + +/* + * ConstructCopyResultStatement constructs the text of a COPY statement + * for copying into a result file. + */ +static StringInfo +ConstructCopyResultStatement(const char *resultId) +{ + StringInfo command = makeStringInfo(); + + appendStringInfo(command, "COPY \"%s\" FROM STDIN WITH (format result)", + resultId); + + return command; +} + + +/* + * RemoteFileDestReceiverReceive implements the receiveSlot function of + * RemoteFileDestReceiver. It takes a TupleTableSlot and sends the contents to + * all worker nodes. + */ +static bool +RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) +{ + RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest; + + TupleDesc tupleDescriptor = resultDest->tupleDescriptor; + + List *connectionList = resultDest->connectionList; + CopyOutState copyOutState = resultDest->copyOutState; + FmgrInfo *columnOutputFunctions = resultDest->columnOutputFunctions; + + Datum *columnValues = NULL; + bool *columnNulls = NULL; + StringInfo copyData = copyOutState->fe_msgbuf; + + EState *executorState = resultDest->executorState; + MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); + + slot_getallattrs(slot); + + columnValues = slot->tts_values; + columnNulls = slot->tts_isnull; + + resetStringInfo(copyData); + + /* construct row in COPY format */ + AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, + copyOutState, columnOutputFunctions, NULL); + + /* send row to nodes */ + BroadcastCopyData(copyData, connectionList); + + /* write to local file (if applicable) */ + if (resultDest->writeLocalFile) + { + WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc); + } + + MemoryContextSwitchTo(oldContext); + + resultDest->tuplesSent++; + + ResetPerTupleExprContext(executorState); + + return true; +} + + +/* + * WriteToLocalResultsFile writes the bytes in a StringInfo to a local file. + */ +static void +WriteToLocalFile(StringInfo copyData, File fileDesc) +{ +#if (PG_VERSION_NUM >= 100000) + int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO); +#else + int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len); +#endif + if (bytesWritten < 0) + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not append to file: %m"))); + } +} + + +/* + * RemoteFileDestReceiverShutdown implements the rShutdown interface of + * RemoteFileDestReceiver. It ends the COPY on all the open connections and closes + * the relation. + */ +static void +RemoteFileDestReceiverShutdown(DestReceiver *destReceiver) +{ + RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver; + + List *connectionList = resultDest->connectionList; + CopyOutState copyOutState = resultDest->copyOutState; + + if (copyOutState->binary) + { + /* send footers when using binary encoding */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyBinaryFooters(copyOutState); + BroadcastCopyData(copyOutState->fe_msgbuf, connectionList); + + if (resultDest->writeLocalFile) + { + WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc); + } + } + + /* close the COPY input */ + EndRemoteCopy(0, connectionList, true); + + if (resultDest->writeLocalFile) + { + FileClose(resultDest->fileDesc); + } +} + + +/* + * BroadcastCopyData sends copy data to all connections in a list. + */ +static void +BroadcastCopyData(StringInfo dataBuffer, List *connectionList) +{ + ListCell *connectionCell = NULL; + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + SendCopyDataOverConnection(dataBuffer, connection); + } +} + + +/* + * SendCopyDataOverConnection sends serialized COPY data over the given + * connection. + */ +static void +SendCopyDataOverConnection(StringInfo dataBuffer, MultiConnection *connection) +{ + if (!PutRemoteCopyData(connection, dataBuffer->data, dataBuffer->len)) + { + ReportConnectionError(connection, ERROR); + } +} + + +/* + * RemoteFileDestReceiverDestroy frees memory allocated as part of the + * RemoteFileDestReceiver and closes file descriptors. + */ +static void +RemoteFileDestReceiverDestroy(DestReceiver *destReceiver) +{ + RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver; + + if (resultDest->copyOutState) + { + pfree(resultDest->copyOutState); + } + + if (resultDest->columnOutputFunctions) + { + pfree(resultDest->columnOutputFunctions); + } + + pfree(resultDest); +} + + +/* + * ReceiveQueryResultViaCopy is called when a COPY "resultid" FROM + * STDIN WITH (format result) command is received from the client. + * The command is followed by the raw copy data stream, which is + * redirected to a file. + * + * File names are automatically prefixed with the user OID. Users + * are only allowed to read query results from their own directory. + */ +void +ReceiveQueryResultViaCopy(const char *resultId) +{ + const char *resultFileName = NULL; + + CreateIntermediateResultsDirectory(); + + resultFileName = QueryResultFileName(resultId); + + RedirectCopyDataToRegularFile(resultFileName); +} + + +/* + * CreateIntermediateResultsDirectory creates the intermediate result + * directory for the current transaction if it does not exist and ensures + * that the directory is removed at the end of the transaction. + */ +static char * +CreateIntermediateResultsDirectory(void) +{ + char *resultDirectory = IntermediateResultsDirectory(); + int makeOK = 0; + + if (!CreatedResultsDirectory) + { + makeOK = mkdir(resultDirectory, S_IRWXU); + if (makeOK != 0 && errno != EEXIST) + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not create intermediate results directory " + "\"%s\": %m", + resultDirectory))); + } + + CreatedResultsDirectory = true; + } + + return resultDirectory; +} + + +/* + * QueryResultFileName returns the file name in which to store + * an intermediate result with the given key in the per transaction + * result directory. + */ +static char * +QueryResultFileName(const char *resultId) +{ + StringInfo resultFileName = makeStringInfo(); + const char *resultDirectory = IntermediateResultsDirectory(); + char *checkChar = (char *) resultId; + + for (; *checkChar; checkChar++) + { + if (!((*checkChar >= 'a' && *checkChar <= 'z') || + (*checkChar >= 'A' && *checkChar <= 'Z') || + (*checkChar >= '0' && *checkChar <= '9') || + (*checkChar == '_') || (*checkChar == '-'))) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), + errmsg("result key \"%s\" contains invalid character", + resultId), + errhint("Result keys may only contain letters, numbers, " + "underscores and hyphens."))); + } + } + + appendStringInfo(resultFileName, "%s/%s.data", + resultDirectory, resultId); + + return resultFileName->data; +} + + +/* + * IntermediateResultsDirectory returns the directory to use for a query result + * file with a particular key. The filename includes the user OID, such + * that users can never read each other's files. + * + * In a distributed transaction, the directory has the form: + * base/pgsql_job_cache/__/ + * + * In a non-distributed transaction, the directory has the form: + * base/pgsql_job_cache/_/ + * + * The latter form can be used for testing COPY ... WITH (format result) without + * assigning a distributed transaction ID. + * + * The pgsql_job_cache directory is emptied on restart in case of failure. + */ +static char * +IntermediateResultsDirectory(void) +{ + StringInfo resultFileName = makeStringInfo(); + Oid userId = GetUserId(); + DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId(); + int initiatorNodeIdentifier = transactionId->initiatorNodeIdentifier; + uint64 transactionNumber = transactionId->transactionNumber; + + if (transactionNumber > 0) + { + appendStringInfo(resultFileName, "base/" PG_JOB_CACHE_DIR "/%u_%u_%lu", + userId, initiatorNodeIdentifier, transactionNumber); + } + else + { + appendStringInfo(resultFileName, "base/" PG_JOB_CACHE_DIR "/%u_%u", + userId, MyProcPid); + } + + return resultFileName->data; +} + + +/* + * RemoveIntermediateResultsDirectory removes the intermediate result directory + * for the current distributed transaction, if any was created. + */ +void +RemoveIntermediateResultsDirectory(void) +{ + if (CreatedResultsDirectory) + { + StringInfo resultsDirectory = makeStringInfo(); + appendStringInfoString(resultsDirectory, IntermediateResultsDirectory()); + + CitusRemoveDirectory(resultsDirectory); + + CreatedResultsDirectory = false; + } +} + + +/* + * read_intermediate_result is a UDF that returns a COPY-formatted intermediate + * result file as a set of records. The file is parsed according to the columns + * definition list specified by the user, e.g.: + * + * SELECT * FROM read_intermediate_result('foo', 'csv') AS (a int, b int) + * + * The file is read from the directory returned by IntermediateResultsDirectory, + * which includes the user ID. + * + * read_intermediate_result is a volatile function because it cannot be + * evaluated until execution time, but for distributed planning purposes we can + * treat it in the same way as immutable functions and reference tables, since + * we know it will return the same result on all nodes. + */ +Datum +read_intermediate_result(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + + text *resultIdText = PG_GETARG_TEXT_P(0); + char *resultIdString = text_to_cstring(resultIdText); + Datum copyFormatOidDatum = PG_GETARG_DATUM(1); + Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); + char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); + + char *resultFileName = NULL; + struct stat fileStat; + int statOK = 0; + + Tuplestorestate *tupstore = NULL; + TupleDesc tupleDescriptor = NULL; + MemoryContext oldcontext = NULL; + + CheckCitusVersion(ERROR); + + resultFileName = QueryResultFileName(resultIdString); + statOK = stat(resultFileName, &fileStat); + if (statOK != 0) + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("result \"%s\" does not exist", resultIdString))); + } + + /* check to see if query supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "set-valued function called in context that cannot accept a set"))); + } + + if (!(rsinfo->allowedModes & SFRM_Materialize)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "materialize mode required, but it is not allowed in this context"))); + } + + /* get a tuple descriptor for our result type */ + switch (get_call_result_type(fcinfo, NULL, &tupleDescriptor)) + { + case TYPEFUNC_COMPOSITE: + { + /* success */ + break; + } + + case TYPEFUNC_RECORD: + { + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + } + + default: + { + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + } + + tupleDescriptor = CreateTupleDescCopy(tupleDescriptor); + + oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupleDescriptor; + MemoryContextSwitchTo(oldcontext); + + ReadFileIntoTupleStore(resultFileName, copyFormatLabel, tupleDescriptor, tupstore); + + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 7216bc5f5..7a2f256ce 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -34,6 +34,7 @@ #include "nodes/makefuncs.h" #include "parser/parsetree.h" #include "storage/lmgr.h" +#include "tcop/pquery.h" #include "tcop/utility.h" #include "utils/snapmgr.h" #include "utils/memutils.h" @@ -93,29 +94,13 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) { CustomScanState customScanState = citusScanState->customScanState; List *workerTaskList = workerJob->taskList; - List *copyOptions = NIL; - EState *executorState = NULL; - MemoryContext executorTupleContext = NULL; - ExprContext *executorExpressionContext = NULL; TupleDesc tupleDescriptor = NULL; - Relation stubRelation = NULL; ListCell *workerTaskCell = NULL; - uint32 columnCount = 0; - Datum *columnValues = NULL; - bool *columnNulls = NULL; bool randomAccess = true; bool interTransactions = false; - - executorState = citusScanState->customScanState.ss.ps.state; - executorTupleContext = GetPerTupleMemoryContext(executorState); - executorExpressionContext = GetPerTupleExprContext(executorState); + char *copyFormat = "text"; tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; - stubRelation = StubRelation(tupleDescriptor); - - columnCount = tupleDescriptor->natts; - columnValues = palloc0(columnCount * sizeof(Datum)); - columnNulls = palloc0(columnCount * sizeof(bool)); Assert(citusScanState->tuplestorestate == NULL); citusScanState->tuplestorestate = @@ -123,16 +108,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) if (BinaryMasterCopyFormat) { - DefElem *copyOption = NULL; - -#if (PG_VERSION_NUM >= 100000) - int location = -1; /* "unknown" token location */ - copyOption = makeDefElem("format", (Node *) makeString("binary"), location); -#else - copyOption = makeDefElem("format", (Node *) makeString("binary")); -#endif - - copyOptions = lappend(copyOptions, copyOption); + copyFormat = "binary"; } foreach(workerTaskCell, workerTaskList) @@ -140,42 +116,85 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) Task *workerTask = (Task *) lfirst(workerTaskCell); StringInfo jobDirectoryName = NULL; StringInfo taskFilename = NULL; - CopyState copyState = NULL; jobDirectoryName = MasterJobDirectoryName(workerTask->jobId); taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId); + ReadFileIntoTupleStore(taskFilename->data, copyFormat, tupleDescriptor, + citusScanState->tuplestorestate); + } + + tuplestore_donestoring(citusScanState->tuplestorestate); +} + + +/* + * ReadFileIntoTupleStore parses the records in a COPY-formatted file according + * according to the given tuple descriptor and stores the records in a tuple + * store. + */ +void +ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescriptor, + Tuplestorestate *tupstore) +{ + CopyState copyState = NULL; + + /* + * Trick BeginCopyFrom into using our tuple descriptor by pretending it belongs + * to a relation. + */ + Relation stubRelation = StubRelation(tupleDescriptor); + + EState *executorState = CreateExecutorState(); + MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState); + + int columnCount = tupleDescriptor->natts; + Datum *columnValues = palloc0(columnCount * sizeof(Datum)); + bool *columnNulls = palloc0(columnCount * sizeof(bool)); + + DefElem *copyOption = NULL; + List *copyOptions = NIL; + #if (PG_VERSION_NUM >= 100000) - copyState = BeginCopyFrom(NULL, stubRelation, taskFilename->data, false, NULL, - NULL, copyOptions); + int location = -1; /* "unknown" token location */ + copyOption = makeDefElem("format", (Node *) makeString(copyFormat), location); #else - copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL, - copyOptions); + copyOption = makeDefElem("format", (Node *) makeString(copyFormat)); +#endif + copyOptions = lappend(copyOptions, copyOption); + +#if (PG_VERSION_NUM >= 100000) + copyState = BeginCopyFrom(NULL, stubRelation, fileName, false, NULL, + NULL, copyOptions); +#else + copyState = BeginCopyFrom(stubRelation, fileName, false, NULL, + copyOptions); #endif - while (true) + while (true) + { + MemoryContext oldContext = NULL; + bool nextRowFound = false; + + ResetPerTupleExprContext(executorState); + oldContext = MemoryContextSwitchTo(executorTupleContext); + + nextRowFound = NextCopyFrom(copyState, executorExpressionContext, + columnValues, columnNulls, NULL); + if (!nextRowFound) { - MemoryContext oldContext = NULL; - bool nextRowFound = false; - - ResetPerTupleExprContext(executorState); - oldContext = MemoryContextSwitchTo(executorTupleContext); - - nextRowFound = NextCopyFrom(copyState, executorExpressionContext, - columnValues, columnNulls, NULL); - if (!nextRowFound) - { - MemoryContextSwitchTo(oldContext); - break; - } - - tuplestore_putvalues(citusScanState->tuplestorestate, tupleDescriptor, - columnValues, columnNulls); MemoryContextSwitchTo(oldContext); + break; } - EndCopyFrom(copyState); + tuplestore_putvalues(tupstore, tupleDescriptor, columnValues, columnNulls); + MemoryContextSwitchTo(oldContext); } + + EndCopyFrom(copyState); + pfree(columnValues); + pfree(columnNulls); } @@ -195,3 +214,86 @@ StubRelation(TupleDesc tupleDescriptor) return stubRelation; } + + +/* + * ExecuteQueryStringIntoDestReceiver plans and executes a query and sends results + * to the given DestReceiver. + */ +void +ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params, + DestReceiver *dest) +{ + Query *query = NULL; + +#if (PG_VERSION_NUM >= 100000) + RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); + List *queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, NULL, 0, NULL); +#else + Node *queryTreeNode = ParseTreeNode(queryString); + List *queryTreeList = pg_analyze_and_rewrite(queryTreeNode, queryString, NULL, 0); +#endif + + if (list_length(queryTreeList) != 1) + { + ereport(ERROR, (errmsg("can only execute a single query"))); + } + + query = (Query *) linitial(queryTreeList); + + ExecuteQueryIntoDestReceiver(query, params, dest); +} + + +/* + * ExecuteQueryIntoDestReceiver plans and executes a query and sends results to the given + * DestReceiver. + */ +void +ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest) +{ + PlannedStmt *queryPlan = NULL; + int cursorOptions = 0; + + cursorOptions = CURSOR_OPT_PARALLEL_OK; + + /* plan the subquery, this may be another distributed query */ + queryPlan = pg_plan_query(query, cursorOptions, params); + + ExecutePlanIntoDestReceiver(queryPlan, params, dest); +} + + +/* + * ExecuteIntoDestReceiver plans and executes a query and sends results to the given + * DestReceiver. + */ +void +ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, + DestReceiver *dest) +{ + Portal portal = NULL; + int eflags = 0; + long count = FETCH_ALL; + + /* create a new portal for executing the query */ + portal = CreateNewPortal(); + + /* don't display the portal in pg_cursors, it is for internal use only */ + portal->visible = false; + + PortalDefineQuery(portal, + NULL, + "", + "SELECT", + list_make1(queryPlan), + NULL); + + PortalStart(portal, params, eflags, GetActiveSnapshot()); +#if (PG_VERSION_NUM >= 100000) + PortalRun(portal, count, false, true, dest, dest, NULL); +#else + PortalRun(portal, count, false, dest, dest, NULL); +#endif + PortalDrop(portal, false); +} diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 8863fcd37..620f71934 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -37,6 +37,7 @@ #include "commands/prepare.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" +#include "distributed/intermediate_results.h" #include "distributed/maintenanced.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" @@ -108,6 +109,7 @@ static bool IsCitusExtensionStmt(Node *parsetree); /* Local functions forward declarations for Transmit statement */ static bool IsTransmitStmt(Node *parsetree); static void VerifyTransmitStmt(CopyStmt *copyStatement); +static bool IsCopyResultStmt(CopyStmt *copyStatement); /* Local functions forward declarations for processing distributed table commands */ static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, @@ -706,6 +708,34 @@ VerifyTransmitStmt(CopyStmt *copyStatement) } +/* + * IsCopyResultStmt determines whether the given copy statement is a + * COPY "resultkey" FROM STDIN WITH (format result) statement, which is used + * to copy query results from the coordinator into workers. + */ +static bool +IsCopyResultStmt(CopyStmt *copyStatement) +{ + ListCell *optionCell = NULL; + bool hasFormatReceive = false; + + /* extract WITH (...) options from the COPY statement */ + foreach(optionCell, copyStatement->options) + { + DefElem *defel = (DefElem *) lfirst(optionCell); + + if (strncmp(defel->defname, "format", NAMEDATALEN) == 0 && + strncmp(defGetString(defel), "result", NAMEDATALEN) == 0) + { + hasFormatReceive = true; + break; + } + } + + return hasFormatReceive; +} + + /* * ProcessCopyStmt handles Citus specific concerns for COPY like supporting * COPYing from distributed tables and preventing unsupported actions. The @@ -723,6 +753,19 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR { *commandMustRunAsOwner = false; /* make sure variable is initialized */ + /* + * Handle special COPY "resultid" FROM STDIN WITH (format result) commands + * for sending intermediate results to workers. + */ + if (IsCopyResultStmt(copyStatement)) + { + const char *resultId = copyStatement->relation->relname; + + ReceiveQueryResultViaCopy(resultId); + + return NULL; + } + /* * We check whether a distributed relation is affected. For that, we need to open the * relation. To prevent race conditions with later lookups, lock the table, and modify diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index fc4891d67..ae38ded87 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -67,7 +67,8 @@ typedef enum RecurringTuplesType RECURRING_TUPLES_INVALID = 0, RECURRING_TUPLES_REFERENCE_TABLE, RECURRING_TUPLES_FUNCTION, - RECURRING_TUPLES_EMPTY_JOIN_TREE + RECURRING_TUPLES_EMPTY_JOIN_TREE, + RECURRING_TUPLES_RESULT_FUNCTION } RecurringTuplesType; @@ -129,6 +130,8 @@ static bool RelationInfoContainsRecurringTuples(PlannerInfo *plannerInfo, RelOptInfo *relationInfo, RecurringTuplesType *recurType); static bool HasRecurringTuples(Node *node, RecurringTuplesType *recurType); +static bool ContainsReadIntermediateResultFunction(Node *node); +static bool IsReadIntermediateResultFunction(Node *node); static void ValidateClauseList(List *clauseList); static bool ExtractFromExpressionWalker(Node *node, QualifierWalkerContext *walkerContext); @@ -952,6 +955,14 @@ DeferErrorIfUnsupportedSublinkAndReferenceTable(Query *queryTree) "clause when the query has subqueries in " "WHERE clause", NULL); } + else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot pushdown the subquery", + "Complex subqueries and CTEs are not allowed in " + "the FROM clause when the query has subqueries in the " + "WHERE clause", NULL); + } else { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, @@ -1218,6 +1229,13 @@ DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree, "Subqueries without a FROM clause are not supported with " "union operator", NULL); } + else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot push down this subquery", + "Complex subqueries and CTEs are not supported within a " + "UNION", NULL); + } return NULL; @@ -1297,7 +1315,18 @@ DeferErrorIfUnsupportedTableCombination(Query *queryTree) } else if (rangeTableEntry->rtekind == RTE_FUNCTION) { - if (contain_mutable_functions((Node *) rangeTableEntry->functions)) + List *functionList = rangeTableEntry->functions; + + if (list_length(functionList) == 1 && + ContainsReadIntermediateResultFunction(linitial(functionList))) + { + /* + * The read_intermediate_result function is volatile, but we know + * it has the same result across all nodes and can therefore treat + * it as a reference table. + */ + } + else if (contain_mutable_functions((Node *) functionList)) { unsupportedTableCombination = true; errorDetail = "Only immutable functions can be used as a table " @@ -2003,7 +2032,13 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin( "There exist a subquery without FROM in the outer " "part of the outer join", NULL); } - + else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot pushdown the subquery", + "Complex subqueries and CTEs cannot be in the outer " + "part of the outer join", NULL); + } return NULL; } @@ -2141,7 +2176,17 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType) } else if (rangeTableEntry->rtekind == RTE_FUNCTION) { - *recurType = RECURRING_TUPLES_FUNCTION; + List *functionList = rangeTableEntry->functions; + + if (list_length(functionList) == 1 && + ContainsReadIntermediateResultFunction((Node *) functionList)) + { + *recurType = RECURRING_TUPLES_RESULT_FUNCTION; + } + else + { + *recurType = RECURRING_TUPLES_FUNCTION; + } /* * Tuples from functions will recur in every query on shards that includes @@ -2176,6 +2221,38 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType) } +/* + * ContainsReadIntermediateResultFunction determines whether an expresion tree contains + * a call to the read_intermediate_results function. + */ +static bool +ContainsReadIntermediateResultFunction(Node *node) +{ + return FindNodeCheck(node, IsReadIntermediateResultFunction); +} + + +/* + * IsReadIntermediateResultFunction determines whether a given node is a function call + * to the read_intermediate_result function. + */ +static bool +IsReadIntermediateResultFunction(Node *node) +{ + if (IsA(node, FuncExpr)) + { + FuncExpr *funcExpr = (FuncExpr *) node; + + if (funcExpr->funcid == CitusReadIntermediateResultFuncId()) + { + return true; + } + } + + return false; +} + + /* * ErrorIfQueryNotSupported checks that we can perform distributed planning for * the given query. The checks in this function will be removed as we support diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 15b57a74d..c73d4b603 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -22,6 +22,7 @@ #include "distributed/backend_data.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/intermediate_results.h" #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" @@ -154,6 +155,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * transaction management. Do so before doing other work, so the * callbacks still can perform work if needed. */ + RemoveIntermediateResultsDirectory(); ResetShardPlacementTransactionState(); if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) @@ -191,6 +193,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * transaction management. Do so before doing other work, so the * callbacks still can perform work if needed. */ + RemoveIntermediateResultsDirectory(); ResetShardPlacementTransactionState(); /* handles both already prepared and open transactions */ diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 7f5420d41..c7a1b4e37 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -117,6 +117,7 @@ typedef struct MetadataCacheData Oid distTransactionRelationId; Oid distTransactionGroupIndexId; Oid distTransactionRecordIndexId; + Oid readIntermediateResultFuncId; Oid extraDataContainerFuncId; Oid workerHashFunctionId; Oid extensionOwner; @@ -1834,6 +1835,31 @@ DistPlacementGroupidIndexId(void) } +/* return oid of the read_intermediate_result(text,citus.copy_format) function */ +Oid +CitusReadIntermediateResultFuncId(void) +{ + if (MetadataCache.readIntermediateResultFuncId == InvalidOid) + { + bool missingOK = false; + + List *copyFormatTypeNameList = list_make2(makeString("citus"), + makeString("copy_format")); + TypeName *copyFormatTypeName = makeTypeNameFromNameList(copyFormatTypeNameList); + Oid copyFormatTypeOid = LookupTypeNameOid(NULL, copyFormatTypeName, missingOK); + + List *functionNameList = list_make2(makeString("pg_catalog"), + makeString("read_intermediate_result")); + Oid paramOids[2] = { TEXTOID, copyFormatTypeOid }; + + MetadataCache.readIntermediateResultFuncId = + LookupFuncName(functionNameList, 2, paramOids, missingOK); + } + + return MetadataCache.readIntermediateResultFuncId; +} + + /* return oid of the citus_extradata_container(internal) function */ Oid CitusExtraDataContainerFuncId(void) diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h new file mode 100644 index 000000000..139ab5d9b --- /dev/null +++ b/src/include/distributed/intermediate_results.h @@ -0,0 +1,29 @@ +/*------------------------------------------------------------------------- + * + * intermediate_results.h + * Functions for writing and reading intermediate results. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef INTERMEDIATE_RESULTS_H +#define INTERMEDIATE_RESULTS_H + + +#include "fmgr.h" + +#include "distributed/multi_copy.h" +#include "nodes/execnodes.h" +#include "nodes/execnodes.h" +#include "nodes/pg_list.h" +#include "tcop/dest.h" +#include "utils/palloc.h" + + +extern void ReceiveQueryResultViaCopy(const char *resultId); +extern void RemoveIntermediateResultsDirectory(void); + + +#endif /* INTERMEDIATE_RESULTS_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 6e0bfa720..027e3cb7e 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -124,6 +124,7 @@ extern Oid DistTransactionRecordIndexId(void); extern Oid DistPlacementGroupidIndexId(void); /* function oids */ +extern Oid CitusReadIntermediateResultFuncId(void); extern Oid CitusExtraDataContainerFuncId(void); extern Oid CitusWorkerHashFunctionId(void); diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 6e51efbcc..0c9b4a2f8 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -115,6 +115,7 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, EState *executorState, bool stopOnFailure); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); +extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription); extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, CopyOutState rowOutputState, @@ -122,6 +123,7 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, CopyCoercionData *columnCoercionPaths); extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState); extern void AppendCopyBinaryFooters(CopyOutState footerOutputState); +extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure); extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag); extern bool IsCopyFromWorker(CopyStmt *copyStatement); extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index c2fbff914..58aab1395 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -30,6 +30,15 @@ extern int MultiShardConnectionType; extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); +extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc + tupleDescriptor, Tuplestorestate *tupstore); +extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo + params, + DestReceiver *dest); +extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, + DestReceiver *dest); +extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, + DestReceiver *dest); #endif /* MULTI_EXECUTOR_H */ diff --git a/src/include/distributed/transmit.h b/src/include/distributed/transmit.h index 8f373cd37..32e2c84d5 100644 --- a/src/include/distributed/transmit.h +++ b/src/include/distributed/transmit.h @@ -12,11 +12,13 @@ #define TRANSMIT_H #include "lib/stringinfo.h" +#include "storage/fd.h" /* Function declarations for transmitting files between two nodes */ extern void RedirectCopyDataToRegularFile(const char *filename); extern void SendRegularFile(const char *filename); +extern File FileOpenForTransmit(const char *filename, int fileFlags, int fileMode); /* Function declaration local to commands and worker modules */ extern void FreeStringInfo(StringInfo stringInfo); diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out new file mode 100644 index 000000000..78271499e --- /dev/null +++ b/src/test/regress/expected/intermediate_results.out @@ -0,0 +1,209 @@ +-- Test functions for copying intermediate results +CREATE SCHEMA intermediate_results; +SET search_path TO 'intermediate_results'; +-- in the same transaction we can read a result +BEGIN; +SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); + create_intermediate_result +---------------------------- + 5 +(1 row) + +SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); + x | x2 +---+---- + 1 | 1 + 2 | 4 + 3 | 9 + 4 | 16 + 5 | 25 +(5 rows) + +COMMIT; +-- in separate transactions, the result is no longer available +SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); + create_intermediate_result +---------------------------- + 5 +(1 row) + +SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); +ERROR: result "squares" does not exist +BEGIN; +CREATE TABLE interesting_squares (user_id text, interested_in text); +SELECT create_distributed_table('interesting_squares', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO interesting_squares VALUES ('jon', '2'), ('jon', '5'), ('jack', '3'); +-- put an intermediate result on all workers +SELECT broadcast_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); + broadcast_intermediate_result +------------------------------- + 5 +(1 row) + +-- query the intermediate result in a router query +SELECT x, x2 +FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int)) squares ON (x::text = interested_in) +WHERE user_id = 'jon' +ORDER BY x; + x | x2 +---+---- + 2 | 4 + 5 | 25 +(2 rows) + +END; +BEGIN; +-- put an intermediate result on all workers +SELECT broadcast_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); + broadcast_intermediate_result +------------------------------- + 5 +(1 row) + +-- query the intermediate result in a distributed query +SELECT x, x2 +FROM interesting_squares +JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int)) squares ON (x::text = interested_in) +ORDER BY x; + x | x2 +---+---- + 2 | 4 + 3 | 9 + 5 | 25 +(3 rows) + +END; +-- files should now be cleaned up +SELECT x, x2 +FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) +WHERE user_id = 'jon' +ORDER BY x; +WARNING: result "squares" does not exist +CONTEXT: while executing command on localhost:57638 +WARNING: result "squares" does not exist +CONTEXT: while executing command on localhost:57637 +ERROR: could not receive query results +-- try to read the file as text, will fail because of binary encoding +BEGIN; +SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); + create_intermediate_result +---------------------------- + 5 +(1 row) + +SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int); +ERROR: invalid byte sequence for encoding "UTF8": 0x00 +END; +-- try to read the file with wrong encoding +BEGIN; +SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); + create_intermediate_result +---------------------------- + 5 +(1 row) + +SELECT * FROM read_intermediate_result('squares', 'csv') AS res (x int, x2 int); +ERROR: invalid input syntax for integer: "PGCOPY" +END; +-- try a composite type +CREATE TYPE intermediate_results.square_type AS (x text, x2 int); +SELECT run_command_on_workers('CREATE TYPE intermediate_results.square_type AS (x text, x2 int)'); + run_command_on_workers +----------------------------------- + (localhost,57637,t,"CREATE TYPE") + (localhost,57638,t,"CREATE TYPE") +(2 rows) + +CREATE TABLE stored_squares (user_id text, square intermediate_results.square_type, metadata jsonb); +INSERT INTO stored_squares VALUES ('jon', '(2,4)'::intermediate_results.square_type, '{"value":2}'); +INSERT INTO stored_squares VALUES ('jon', '(3,9)'::intermediate_results.square_type, '{"value":3}'); +INSERT INTO stored_squares VALUES ('jon', '(4,16)'::intermediate_results.square_type, '{"value":4}'); +INSERT INTO stored_squares VALUES ('jon', '(5,25)'::intermediate_results.square_type, '{"value":5}'); +-- composite types change the format to text +BEGIN; +SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares'); + create_intermediate_result +---------------------------- + 4 +(1 row) + +SELECT * FROM read_intermediate_result('stored_squares', 'binary') AS res (s intermediate_results.square_type); +ERROR: COPY file signature not recognized +COMMIT; +BEGIN; +SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares'); + create_intermediate_result +---------------------------- + 4 +(1 row) + +SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type); + s +-------- + (2,4) + (3,9) + (4,16) + (5,25) +(4 rows) + +COMMIT; +BEGIN; +-- put an intermediate result in text format on all workers +SELECT broadcast_intermediate_result('stored_squares', 'SELECT square, metadata FROM stored_squares'); + broadcast_intermediate_result +------------------------------- + 4 +(1 row) + +-- query the intermediate result in a router query using text format +SELECT * FROM interesting_squares JOIN ( + SELECT * FROM + read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type, m jsonb) +) squares +ON ((s).x = interested_in) WHERE user_id = 'jon' ORDER BY 1,2; + user_id | interested_in | s | m +---------+---------------+--------+-------------- + jon | 2 | (2,4) | {"value": 2} + jon | 5 | (5,25) | {"value": 5} +(2 rows) + +-- query the intermediate result in a real-time query using text format +SELECT * FROM interesting_squares JOIN ( + SELECT * FROM + read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type, m jsonb) +) squares +ON ((s).x = interested_in) ORDER BY 1,2; + user_id | interested_in | s | m +---------+---------------+--------+-------------- + jack | 3 | (3,9) | {"value": 3} + jon | 2 | (2,4) | {"value": 2} + jon | 5 | (5,25) | {"value": 5} +(3 rows) + +END; +-- pipe query output into a result file and create a table to check the result +COPY (SELECT s, s*s FROM generate_series(1,5) s) +TO PROGRAM + $$psql -h localhost -p 57636 -U postgres -d regression -c "BEGIN; COPY squares FROM STDIN WITH (format result); CREATE TABLE intermediate_results.squares AS SELECT * FROM read_intermediate_result('squares', 'binary') AS res(x int, x2 int); END;"$$ +WITH (FORMAT binary); +SELECT * FROM squares ORDER BY x; + x | x2 +---+---- + 1 | 1 + 2 | 4 + 3 | 9 + 4 | 16 + 5 | 25 +(5 rows) + +DROP SCHEMA intermediate_results CASCADE; +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table interesting_squares +drop cascades to type square_type +drop cascades to table stored_squares +drop cascades to table squares diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index db3fa8998..356c89165 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -129,6 +129,8 @@ ALTER EXTENSION citus UPDATE TO '7.1-1'; ALTER EXTENSION citus UPDATE TO '7.1-2'; ALTER EXTENSION citus UPDATE TO '7.1-3'; ALTER EXTENSION citus UPDATE TO '7.1-4'; +ALTER EXTENSION citus UPDATE TO '7.2-1'; +ALTER EXTENSION citus UPDATE TO '7.2-2'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 331b43e59..bef6fb31b 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -41,7 +41,7 @@ test: multi_partitioning_utils multi_partitioning # ---------- # Miscellaneous tests to check our query planning behavior # ---------- -test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction +test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction intermediate_results test: multi_explain test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql new file mode 100644 index 000000000..2b23c416f --- /dev/null +++ b/src/test/regress/sql/intermediate_results.sql @@ -0,0 +1,108 @@ +-- Test functions for copying intermediate results +CREATE SCHEMA intermediate_results; +SET search_path TO 'intermediate_results'; + +-- in the same transaction we can read a result +BEGIN; +SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); +SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); +COMMIT; + +-- in separate transactions, the result is no longer available +SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); +SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); + +BEGIN; +CREATE TABLE interesting_squares (user_id text, interested_in text); +SELECT create_distributed_table('interesting_squares', 'user_id'); +INSERT INTO interesting_squares VALUES ('jon', '2'), ('jon', '5'), ('jack', '3'); + +-- put an intermediate result on all workers +SELECT broadcast_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); + +-- query the intermediate result in a router query +SELECT x, x2 +FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int)) squares ON (x::text = interested_in) +WHERE user_id = 'jon' +ORDER BY x; + +END; + +BEGIN; +-- put an intermediate result on all workers +SELECT broadcast_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); +-- query the intermediate result in a distributed query +SELECT x, x2 +FROM interesting_squares +JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int)) squares ON (x::text = interested_in) +ORDER BY x; +END; + +-- files should now be cleaned up +SELECT x, x2 +FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) +WHERE user_id = 'jon' +ORDER BY x; + +-- try to read the file as text, will fail because of binary encoding +BEGIN; +SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); +SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int); +END; + +-- try to read the file with wrong encoding +BEGIN; +SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); +SELECT * FROM read_intermediate_result('squares', 'csv') AS res (x int, x2 int); +END; + +-- try a composite type +CREATE TYPE intermediate_results.square_type AS (x text, x2 int); +SELECT run_command_on_workers('CREATE TYPE intermediate_results.square_type AS (x text, x2 int)'); + +CREATE TABLE stored_squares (user_id text, square intermediate_results.square_type, metadata jsonb); +INSERT INTO stored_squares VALUES ('jon', '(2,4)'::intermediate_results.square_type, '{"value":2}'); +INSERT INTO stored_squares VALUES ('jon', '(3,9)'::intermediate_results.square_type, '{"value":3}'); +INSERT INTO stored_squares VALUES ('jon', '(4,16)'::intermediate_results.square_type, '{"value":4}'); +INSERT INTO stored_squares VALUES ('jon', '(5,25)'::intermediate_results.square_type, '{"value":5}'); + +-- composite types change the format to text +BEGIN; +SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares'); +SELECT * FROM read_intermediate_result('stored_squares', 'binary') AS res (s intermediate_results.square_type); +COMMIT; + +BEGIN; +SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares'); +SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type); +COMMIT; + +BEGIN; +-- put an intermediate result in text format on all workers +SELECT broadcast_intermediate_result('stored_squares', 'SELECT square, metadata FROM stored_squares'); + +-- query the intermediate result in a router query using text format +SELECT * FROM interesting_squares JOIN ( + SELECT * FROM + read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type, m jsonb) +) squares +ON ((s).x = interested_in) WHERE user_id = 'jon' ORDER BY 1,2; + +-- query the intermediate result in a real-time query using text format +SELECT * FROM interesting_squares JOIN ( + SELECT * FROM + read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type, m jsonb) +) squares +ON ((s).x = interested_in) ORDER BY 1,2; + +END; + +-- pipe query output into a result file and create a table to check the result +COPY (SELECT s, s*s FROM generate_series(1,5) s) +TO PROGRAM + $$psql -h localhost -p 57636 -U postgres -d regression -c "BEGIN; COPY squares FROM STDIN WITH (format result); CREATE TABLE intermediate_results.squares AS SELECT * FROM read_intermediate_result('squares', 'binary') AS res(x int, x2 int); END;"$$ +WITH (FORMAT binary); + +SELECT * FROM squares ORDER BY x; + +DROP SCHEMA intermediate_results CASCADE; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index bef52eed2..e676e2759 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -129,6 +129,8 @@ ALTER EXTENSION citus UPDATE TO '7.1-1'; ALTER EXTENSION citus UPDATE TO '7.1-2'; ALTER EXTENSION citus UPDATE TO '7.1-3'; ALTER EXTENSION citus UPDATE TO '7.1-4'; +ALTER EXTENSION citus UPDATE TO '7.2-1'; +ALTER EXTENSION citus UPDATE TO '7.2-2'; -- show running version SHOW citus.version;