mirror of https://github.com/citusdata/citus.git
Merge pull request #1829 from citusdata/intermediate_result
Add infrastructure for moving around intermediate resultspull/1854/head
commit
98522d8d7f
|
@ -13,7 +13,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
6.2-1 6.2-2 6.2-3 6.2-4 \
|
6.2-1 6.2-2 6.2-3 6.2-4 \
|
||||||
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13 7.0-14 7.0-15 \
|
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13 7.0-14 7.0-15 \
|
||||||
7.1-1 7.1-2 7.1-3 7.1-4 \
|
7.1-1 7.1-2 7.1-3 7.1-4 \
|
||||||
7.2-1
|
7.2-1 7.2-2
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -181,6 +181,8 @@ $(EXTENSION)--7.1-4.sql: $(EXTENSION)--7.1-3.sql $(EXTENSION)--7.1-3--7.1-4.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--7.2-1.sql: $(EXTENSION)--7.1-4.sql $(EXTENSION)--7.1-4--7.2-1.sql
|
$(EXTENSION)--7.2-1.sql: $(EXTENSION)--7.1-4.sql $(EXTENSION)--7.1-4--7.2-1.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--7.2-2.sql: $(EXTENSION)--7.2-1.sql $(EXTENSION)--7.2-1--7.2-2.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
/* citus--7.2-1--7.2-2 */
|
||||||
|
|
||||||
|
CREATE TYPE citus.copy_format AS ENUM ('csv', 'binary', 'text');
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.read_intermediate_result(result_id text, format citus.copy_format default 'csv')
|
||||||
|
RETURNS record
|
||||||
|
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
|
||||||
|
AS 'MODULE_PATHNAME', $$read_intermediate_result$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.read_intermediate_result(text,citus.copy_format)
|
||||||
|
IS 'read a file and return it as a set of records';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.create_intermediate_result(result_id text, query text)
|
||||||
|
RETURNS bigint
|
||||||
|
LANGUAGE C STRICT VOLATILE
|
||||||
|
AS 'MODULE_PATHNAME', $$create_intermediate_result$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.create_intermediate_result(text,text)
|
||||||
|
IS 'execute a query and write its results to local result file';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.broadcast_intermediate_result(result_id text, query text)
|
||||||
|
RETURNS bigint
|
||||||
|
LANGUAGE C STRICT VOLATILE
|
||||||
|
AS 'MODULE_PATHNAME', $$broadcast_intermediate_result$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.broadcast_intermediate_result(text,text)
|
||||||
|
IS 'execute a query and write its results to an result file on all workers';
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '7.2-1'
|
default_version = '7.2-2'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -97,22 +97,18 @@ static void OpenCopyConnections(CopyStmt *copyStatement,
|
||||||
ShardConnections *shardConnections, bool stopOnFailure,
|
ShardConnections *shardConnections, bool stopOnFailure,
|
||||||
bool useBinaryCopyFormat);
|
bool useBinaryCopyFormat);
|
||||||
|
|
||||||
static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);
|
|
||||||
static bool BinaryOutputFunctionDefined(Oid typeId);
|
static bool BinaryOutputFunctionDefined(Oid typeId);
|
||||||
static List * MasterShardPlacementList(uint64 shardId);
|
static List * MasterShardPlacementList(uint64 shardId);
|
||||||
static List * RemoteFinalizedShardPlacementList(uint64 shardId);
|
static List * RemoteFinalizedShardPlacementList(uint64 shardId);
|
||||||
|
|
||||||
static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId,
|
static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId,
|
||||||
List *connectionList);
|
List *connectionList);
|
||||||
static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId,
|
static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId,
|
||||||
List *connectionList);
|
List *connectionList);
|
||||||
|
|
||||||
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId,
|
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId,
|
||||||
bool useBinaryCopyFormat);
|
bool useBinaryCopyFormat);
|
||||||
static void SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList);
|
static void SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList);
|
||||||
static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId,
|
static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId,
|
||||||
MultiConnection *connection);
|
MultiConnection *connection);
|
||||||
static void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure);
|
|
||||||
static void ReportCopyError(MultiConnection *connection, PGresult *result);
|
static void ReportCopyError(MultiConnection *connection, PGresult *result);
|
||||||
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
|
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
|
||||||
static int64 StartCopyToNewShard(ShardConnections *shardConnections,
|
static int64 StartCopyToNewShard(ShardConnections *shardConnections,
|
||||||
|
@ -904,7 +900,7 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
|
||||||
* worker nodes for user-defined types. If the function can not detect a binary
|
* worker nodes for user-defined types. If the function can not detect a binary
|
||||||
* output function for any of the column, it returns false.
|
* output function for any of the column, it returns false.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
CanUseBinaryCopyFormat(TupleDesc tupleDescription)
|
CanUseBinaryCopyFormat(TupleDesc tupleDescription)
|
||||||
{
|
{
|
||||||
bool useBinaryCopyFormat = true;
|
bool useBinaryCopyFormat = true;
|
||||||
|
@ -1169,7 +1165,7 @@ SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *c
|
||||||
* If stopOnFailure is true, then EndRemoteCopy reports an error on failure,
|
* If stopOnFailure is true, then EndRemoteCopy reports an error on failure,
|
||||||
* otherwise it reports a warning or continues.
|
* otherwise it reports a warning or continues.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
|
EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
|
||||||
{
|
{
|
||||||
ListCell *connectionCell = NULL;
|
ListCell *connectionCell = NULL;
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
#include "distributed/relay_utility.h"
|
#include "distributed/relay_utility.h"
|
||||||
#include "distributed/transmit.h"
|
#include "distributed/transmit.h"
|
||||||
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "libpq/libpq.h"
|
#include "libpq/libpq.h"
|
||||||
#include "libpq/pqformat.h"
|
#include "libpq/pqformat.h"
|
||||||
|
@ -24,7 +25,6 @@
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
static File FileOpenForTransmit(const char *filename, int fileFlags, int fileMode);
|
|
||||||
static void SendCopyInStart(void);
|
static void SendCopyInStart(void);
|
||||||
static void SendCopyOutStart(void);
|
static void SendCopyOutStart(void);
|
||||||
static void SendCopyDone(void);
|
static void SendCopyDone(void);
|
||||||
|
@ -150,7 +150,7 @@ FreeStringInfo(StringInfo stringInfo)
|
||||||
* the function returns the internal file handle for the opened file. On failure
|
* the function returns the internal file handle for the opened file. On failure
|
||||||
* the function errors out.
|
* the function errors out.
|
||||||
*/
|
*/
|
||||||
static File
|
File
|
||||||
FileOpenForTransmit(const char *filename, int fileFlags, int fileMode)
|
FileOpenForTransmit(const char *filename, int fileFlags, int fileMode)
|
||||||
{
|
{
|
||||||
File fileDesc = -1;
|
File fileDesc = -1;
|
||||||
|
|
|
@ -36,8 +36,6 @@
|
||||||
|
|
||||||
static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
|
static void ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
|
||||||
Query *selectQuery, EState *executorState);
|
Query *selectQuery, EState *executorState);
|
||||||
static void ExecuteIntoDestReceiver(Query *query, ParamListInfo params,
|
|
||||||
DestReceiver *dest);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -137,50 +135,9 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
|
||||||
partitionColumnIndex, executorState,
|
partitionColumnIndex, executorState,
|
||||||
stopOnFailure);
|
stopOnFailure);
|
||||||
|
|
||||||
ExecuteIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest);
|
ExecuteQueryIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest);
|
||||||
|
|
||||||
executorState->es_processed = copyDest->tuplesSent;
|
executorState->es_processed = copyDest->tuplesSent;
|
||||||
|
|
||||||
XactModificationLevel = XACT_MODIFICATION_DATA;
|
XactModificationLevel = XACT_MODIFICATION_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ExecuteIntoDestReceiver plans and executes a query and sends results to the given
|
|
||||||
* DestReceiver.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ExecuteIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest)
|
|
||||||
{
|
|
||||||
PlannedStmt *queryPlan = NULL;
|
|
||||||
Portal portal = NULL;
|
|
||||||
int eflags = 0;
|
|
||||||
int cursorOptions = 0;
|
|
||||||
long count = FETCH_ALL;
|
|
||||||
|
|
||||||
/* create a new portal for executing the query */
|
|
||||||
portal = CreateNewPortal();
|
|
||||||
|
|
||||||
/* don't display the portal in pg_cursors, it is for internal use only */
|
|
||||||
portal->visible = false;
|
|
||||||
|
|
||||||
cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
|
||||||
|
|
||||||
/* plan the subquery, this may be another distributed query */
|
|
||||||
queryPlan = pg_plan_query(query, cursorOptions, params);
|
|
||||||
|
|
||||||
PortalDefineQuery(portal,
|
|
||||||
NULL,
|
|
||||||
"",
|
|
||||||
"SELECT",
|
|
||||||
list_make1(queryPlan),
|
|
||||||
NULL);
|
|
||||||
|
|
||||||
PortalStart(portal, params, eflags, GetActiveSnapshot());
|
|
||||||
#if (PG_VERSION_NUM >= 100000)
|
|
||||||
PortalRun(portal, count, false, true, dest, dest, NULL);
|
|
||||||
#else
|
|
||||||
PortalRun(portal, count, false, dest, dest, NULL);
|
|
||||||
#endif
|
|
||||||
PortalDrop(portal, false);
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,755 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* intermediate_results.c
|
||||||
|
* Functions for writing and reading intermediate results.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2017, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
#include <sys/stat.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "funcapi.h"
|
||||||
|
#include "libpq-fe.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
#include "pgstat.h"
|
||||||
|
|
||||||
|
#include "catalog/pg_enum.h"
|
||||||
|
#include "commands/copy.h"
|
||||||
|
#include "distributed/connection_management.h"
|
||||||
|
#include "distributed/intermediate_results.h"
|
||||||
|
#include "distributed/master_metadata_utility.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/multi_copy.h"
|
||||||
|
#include "distributed/multi_executor.h"
|
||||||
|
#include "distributed/remote_commands.h"
|
||||||
|
#include "distributed/transmit.h"
|
||||||
|
#include "distributed/transaction_identifier.h"
|
||||||
|
#include "distributed/worker_protocol.h"
|
||||||
|
#include "nodes/makefuncs.h"
|
||||||
|
#include "nodes/parsenodes.h"
|
||||||
|
#include "nodes/primnodes.h"
|
||||||
|
#include "storage/fd.h"
|
||||||
|
#include "tcop/tcopprot.h"
|
||||||
|
#include "utils/builtins.h"
|
||||||
|
#include "utils/lsyscache.h"
|
||||||
|
#include "utils/memutils.h"
|
||||||
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
|
|
||||||
|
static bool CreatedResultsDirectory = false;
|
||||||
|
|
||||||
|
|
||||||
|
/* CopyDestReceiver can be used to stream results into a distributed table */
|
||||||
|
typedef struct RemoteFileDestReceiver
|
||||||
|
{
|
||||||
|
/* public DestReceiver interface */
|
||||||
|
DestReceiver pub;
|
||||||
|
|
||||||
|
char *resultId;
|
||||||
|
|
||||||
|
/* descriptor of the tuples that are sent to the worker */
|
||||||
|
TupleDesc tupleDescriptor;
|
||||||
|
|
||||||
|
/* EState for per-tuple memory allocation */
|
||||||
|
EState *executorState;
|
||||||
|
|
||||||
|
/* MemoryContext for DestReceiver session */
|
||||||
|
MemoryContext memoryContext;
|
||||||
|
|
||||||
|
/* worker nodes to send data to */
|
||||||
|
List *initialNodeList;
|
||||||
|
List *connectionList;
|
||||||
|
|
||||||
|
/* whether to write to a local file */
|
||||||
|
bool writeLocalFile;
|
||||||
|
File fileDesc;
|
||||||
|
|
||||||
|
/* state on how to copy out data types */
|
||||||
|
CopyOutState copyOutState;
|
||||||
|
FmgrInfo *columnOutputFunctions;
|
||||||
|
|
||||||
|
/* number of tuples sent */
|
||||||
|
uint64 tuplesSent;
|
||||||
|
} RemoteFileDestReceiver;
|
||||||
|
|
||||||
|
|
||||||
|
static RemoteFileDestReceiver * CreateRemoteFileDestReceiver(char *resultId,
|
||||||
|
EState *executorState,
|
||||||
|
List *initialNodeList,
|
||||||
|
bool writeLocalFile);
|
||||||
|
static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
|
TupleDesc inputTupleDescriptor);
|
||||||
|
static StringInfo ConstructCopyResultStatement(const char *resultId);
|
||||||
|
static void WriteToLocalFile(StringInfo copyData, File fileDesc);
|
||||||
|
static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
|
||||||
|
static void BroadcastCopyData(StringInfo dataBuffer, List *connectionList);
|
||||||
|
static void SendCopyDataOverConnection(StringInfo dataBuffer,
|
||||||
|
MultiConnection *connection);
|
||||||
|
static void RemoteFileDestReceiverShutdown(DestReceiver *destReceiver);
|
||||||
|
static void RemoteFileDestReceiverDestroy(DestReceiver *destReceiver);
|
||||||
|
|
||||||
|
static char * CreateIntermediateResultsDirectory(void);
|
||||||
|
static char * IntermediateResultsDirectory(void);
|
||||||
|
static char * QueryResultFileName(const char *resultId);
|
||||||
|
|
||||||
|
|
||||||
|
/* exports for SQL callable functions */
|
||||||
|
PG_FUNCTION_INFO_V1(read_intermediate_result);
|
||||||
|
PG_FUNCTION_INFO_V1(broadcast_intermediate_result);
|
||||||
|
PG_FUNCTION_INFO_V1(create_intermediate_result);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* broadcast_intermediate_result executes a query and streams the results
|
||||||
|
* into a file on all workers.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
broadcast_intermediate_result(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
text *resultIdText = PG_GETARG_TEXT_P(0);
|
||||||
|
char *resultIdString = text_to_cstring(resultIdText);
|
||||||
|
text *queryText = PG_GETARG_TEXT_P(1);
|
||||||
|
char *queryString = text_to_cstring(queryText);
|
||||||
|
EState *estate = NULL;
|
||||||
|
List *nodeList = NIL;
|
||||||
|
bool writeLocalFile = false;
|
||||||
|
RemoteFileDestReceiver *resultDest = NULL;
|
||||||
|
ParamListInfo paramListInfo = NULL;
|
||||||
|
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
nodeList = ActivePrimaryNodeList();
|
||||||
|
estate = CreateExecutorState();
|
||||||
|
resultDest = CreateRemoteFileDestReceiver(resultIdString, estate, nodeList,
|
||||||
|
writeLocalFile);
|
||||||
|
|
||||||
|
ExecuteQueryStringIntoDestReceiver(queryString, paramListInfo,
|
||||||
|
(DestReceiver *) resultDest);
|
||||||
|
|
||||||
|
FreeExecutorState(estate);
|
||||||
|
|
||||||
|
PG_RETURN_INT64(resultDest->tuplesSent);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* create_intermediate_result executes a query and writes the results
|
||||||
|
* into a local file.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
create_intermediate_result(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
text *resultIdText = PG_GETARG_TEXT_P(0);
|
||||||
|
char *resultIdString = text_to_cstring(resultIdText);
|
||||||
|
text *queryText = PG_GETARG_TEXT_P(1);
|
||||||
|
char *queryString = text_to_cstring(queryText);
|
||||||
|
EState *estate = NULL;
|
||||||
|
List *nodeList = NIL;
|
||||||
|
bool writeLocalFile = true;
|
||||||
|
RemoteFileDestReceiver *resultDest = NULL;
|
||||||
|
ParamListInfo paramListInfo = NULL;
|
||||||
|
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
estate = CreateExecutorState();
|
||||||
|
resultDest = CreateRemoteFileDestReceiver(resultIdString, estate, nodeList,
|
||||||
|
writeLocalFile);
|
||||||
|
|
||||||
|
ExecuteQueryStringIntoDestReceiver(queryString, paramListInfo,
|
||||||
|
(DestReceiver *) resultDest);
|
||||||
|
|
||||||
|
FreeExecutorState(estate);
|
||||||
|
|
||||||
|
PG_RETURN_INT64(resultDest->tuplesSent);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateRemoteFileDestReceiver creates a DestReceiver that streams results
|
||||||
|
* to a set of worker nodes.
|
||||||
|
*/
|
||||||
|
static RemoteFileDestReceiver *
|
||||||
|
CreateRemoteFileDestReceiver(char *resultId, EState *executorState,
|
||||||
|
List *initialNodeList, bool writeLocalFile)
|
||||||
|
{
|
||||||
|
RemoteFileDestReceiver *resultDest = NULL;
|
||||||
|
|
||||||
|
resultDest = (RemoteFileDestReceiver *) palloc0(sizeof(RemoteFileDestReceiver));
|
||||||
|
|
||||||
|
/* set up the DestReceiver function pointers */
|
||||||
|
resultDest->pub.receiveSlot = RemoteFileDestReceiverReceive;
|
||||||
|
resultDest->pub.rStartup = RemoteFileDestReceiverStartup;
|
||||||
|
resultDest->pub.rShutdown = RemoteFileDestReceiverShutdown;
|
||||||
|
resultDest->pub.rDestroy = RemoteFileDestReceiverDestroy;
|
||||||
|
resultDest->pub.mydest = DestCopyOut;
|
||||||
|
|
||||||
|
/* set up output parameters */
|
||||||
|
resultDest->resultId = resultId;
|
||||||
|
resultDest->executorState = executorState;
|
||||||
|
resultDest->initialNodeList = initialNodeList;
|
||||||
|
resultDest->memoryContext = CurrentMemoryContext;
|
||||||
|
resultDest->writeLocalFile = writeLocalFile;
|
||||||
|
|
||||||
|
return resultDest;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteFileDestReceiverStartup implements the rStartup interface of
|
||||||
|
* RemoteFileDestReceiver. It opens the relation
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
|
TupleDesc inputTupleDescriptor)
|
||||||
|
{
|
||||||
|
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest;
|
||||||
|
|
||||||
|
const char *resultId = resultDest->resultId;
|
||||||
|
|
||||||
|
CopyOutState copyOutState = NULL;
|
||||||
|
const char *delimiterCharacter = "\t";
|
||||||
|
const char *nullPrintCharacter = "\\N";
|
||||||
|
|
||||||
|
List *initialNodeList = resultDest->initialNodeList;
|
||||||
|
ListCell *initialNodeCell = NULL;
|
||||||
|
List *connectionList = NIL;
|
||||||
|
ListCell *connectionCell = NULL;
|
||||||
|
|
||||||
|
resultDest->tupleDescriptor = inputTupleDescriptor;
|
||||||
|
|
||||||
|
/* define how tuples will be serialised */
|
||||||
|
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
||||||
|
copyOutState->delim = (char *) delimiterCharacter;
|
||||||
|
copyOutState->null_print = (char *) nullPrintCharacter;
|
||||||
|
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
||||||
|
copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor);
|
||||||
|
copyOutState->fe_msgbuf = makeStringInfo();
|
||||||
|
copyOutState->rowcontext = GetPerTupleMemoryContext(resultDest->executorState);
|
||||||
|
resultDest->copyOutState = copyOutState;
|
||||||
|
|
||||||
|
resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
|
||||||
|
copyOutState->binary);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make sure that this transaction has a distributed transaction ID.
|
||||||
|
*
|
||||||
|
* Intermediate results will be stored in a directory that is derived from
|
||||||
|
* the distributed transaction ID across all workers and on the coordinator
|
||||||
|
* itself. Even if we only store results locally, we still want to assign
|
||||||
|
* a transaction ID in case we later store results on workers.
|
||||||
|
*
|
||||||
|
* When we start using broadcast_intermediate_result from workers, we
|
||||||
|
* need to make sure that we don't override the transaction ID here.
|
||||||
|
*/
|
||||||
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
|
||||||
|
if (resultDest->writeLocalFile)
|
||||||
|
{
|
||||||
|
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
||||||
|
const int fileMode = (S_IRUSR | S_IWUSR);
|
||||||
|
const char *fileName = NULL;
|
||||||
|
|
||||||
|
/* make sure the directory exists */
|
||||||
|
CreateIntermediateResultsDirectory();
|
||||||
|
|
||||||
|
fileName = QueryResultFileName(resultId);
|
||||||
|
|
||||||
|
elog(DEBUG1, "writing to local file \"%s\"", fileName);
|
||||||
|
|
||||||
|
resultDest->fileDesc = FileOpenForTransmit(fileName, fileFlags, fileMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach(initialNodeCell, initialNodeList)
|
||||||
|
{
|
||||||
|
WorkerNode *workerNode = (WorkerNode *) lfirst(initialNodeCell);
|
||||||
|
int connectionFlags = 0;
|
||||||
|
char *nodeName = workerNode->workerName;
|
||||||
|
int nodePort = workerNode->workerPort;
|
||||||
|
MultiConnection *connection = NULL;
|
||||||
|
|
||||||
|
connection = StartNodeConnection(connectionFlags, nodeName, nodePort);
|
||||||
|
ClaimConnectionExclusively(connection);
|
||||||
|
MarkRemoteTransactionCritical(connection);
|
||||||
|
|
||||||
|
connectionList = lappend(connectionList, connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
FinishConnectionListEstablishment(connectionList);
|
||||||
|
|
||||||
|
/* must open transaction blocks to use intermediate results */
|
||||||
|
RemoteTransactionsBeginIfNecessary(connectionList);
|
||||||
|
|
||||||
|
foreach(connectionCell, connectionList)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
|
||||||
|
StringInfo copyCommand = NULL;
|
||||||
|
bool querySent = false;
|
||||||
|
|
||||||
|
copyCommand = ConstructCopyResultStatement(resultId);
|
||||||
|
|
||||||
|
querySent = SendRemoteCommand(connection, copyCommand->data);
|
||||||
|
if (!querySent)
|
||||||
|
{
|
||||||
|
ReportConnectionError(connection, ERROR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach(connectionCell, connectionList)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
|
||||||
|
bool raiseInterrupts = true;
|
||||||
|
|
||||||
|
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||||
|
if (PQresultStatus(result) != PGRES_COPY_IN)
|
||||||
|
{
|
||||||
|
ReportResultError(connection, result, ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (copyOutState->binary)
|
||||||
|
{
|
||||||
|
/* send headers when using binary encoding */
|
||||||
|
resetStringInfo(copyOutState->fe_msgbuf);
|
||||||
|
AppendCopyBinaryHeaders(copyOutState);
|
||||||
|
BroadcastCopyData(copyOutState->fe_msgbuf, connectionList);
|
||||||
|
|
||||||
|
if (resultDest->writeLocalFile)
|
||||||
|
{
|
||||||
|
WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
resultDest->connectionList = connectionList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ConstructCopyResultStatement constructs the text of a COPY statement
|
||||||
|
* for copying into a result file.
|
||||||
|
*/
|
||||||
|
static StringInfo
|
||||||
|
ConstructCopyResultStatement(const char *resultId)
|
||||||
|
{
|
||||||
|
StringInfo command = makeStringInfo();
|
||||||
|
|
||||||
|
appendStringInfo(command, "COPY \"%s\" FROM STDIN WITH (format result)",
|
||||||
|
resultId);
|
||||||
|
|
||||||
|
return command;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteFileDestReceiverReceive implements the receiveSlot function of
|
||||||
|
* RemoteFileDestReceiver. It takes a TupleTableSlot and sends the contents to
|
||||||
|
* all worker nodes.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
|
{
|
||||||
|
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest;
|
||||||
|
|
||||||
|
TupleDesc tupleDescriptor = resultDest->tupleDescriptor;
|
||||||
|
|
||||||
|
List *connectionList = resultDest->connectionList;
|
||||||
|
CopyOutState copyOutState = resultDest->copyOutState;
|
||||||
|
FmgrInfo *columnOutputFunctions = resultDest->columnOutputFunctions;
|
||||||
|
|
||||||
|
Datum *columnValues = NULL;
|
||||||
|
bool *columnNulls = NULL;
|
||||||
|
StringInfo copyData = copyOutState->fe_msgbuf;
|
||||||
|
|
||||||
|
EState *executorState = resultDest->executorState;
|
||||||
|
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
|
||||||
|
|
||||||
|
slot_getallattrs(slot);
|
||||||
|
|
||||||
|
columnValues = slot->tts_values;
|
||||||
|
columnNulls = slot->tts_isnull;
|
||||||
|
|
||||||
|
resetStringInfo(copyData);
|
||||||
|
|
||||||
|
/* construct row in COPY format */
|
||||||
|
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
|
||||||
|
copyOutState, columnOutputFunctions, NULL);
|
||||||
|
|
||||||
|
/* send row to nodes */
|
||||||
|
BroadcastCopyData(copyData, connectionList);
|
||||||
|
|
||||||
|
/* write to local file (if applicable) */
|
||||||
|
if (resultDest->writeLocalFile)
|
||||||
|
{
|
||||||
|
WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc);
|
||||||
|
}
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
resultDest->tuplesSent++;
|
||||||
|
|
||||||
|
ResetPerTupleExprContext(executorState);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WriteToLocalResultsFile writes the bytes in a StringInfo to a local file.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
WriteToLocalFile(StringInfo copyData, File fileDesc)
|
||||||
|
{
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO);
|
||||||
|
#else
|
||||||
|
int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len);
|
||||||
|
#endif
|
||||||
|
if (bytesWritten < 0)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
errmsg("could not append to file: %m")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteFileDestReceiverShutdown implements the rShutdown interface of
|
||||||
|
* RemoteFileDestReceiver. It ends the COPY on all the open connections and closes
|
||||||
|
* the relation.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RemoteFileDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
|
{
|
||||||
|
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver;
|
||||||
|
|
||||||
|
List *connectionList = resultDest->connectionList;
|
||||||
|
CopyOutState copyOutState = resultDest->copyOutState;
|
||||||
|
|
||||||
|
if (copyOutState->binary)
|
||||||
|
{
|
||||||
|
/* send footers when using binary encoding */
|
||||||
|
resetStringInfo(copyOutState->fe_msgbuf);
|
||||||
|
AppendCopyBinaryFooters(copyOutState);
|
||||||
|
BroadcastCopyData(copyOutState->fe_msgbuf, connectionList);
|
||||||
|
|
||||||
|
if (resultDest->writeLocalFile)
|
||||||
|
{
|
||||||
|
WriteToLocalFile(copyOutState->fe_msgbuf, resultDest->fileDesc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* close the COPY input */
|
||||||
|
EndRemoteCopy(0, connectionList, true);
|
||||||
|
|
||||||
|
if (resultDest->writeLocalFile)
|
||||||
|
{
|
||||||
|
FileClose(resultDest->fileDesc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BroadcastCopyData sends copy data to all connections in a list.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
BroadcastCopyData(StringInfo dataBuffer, List *connectionList)
|
||||||
|
{
|
||||||
|
ListCell *connectionCell = NULL;
|
||||||
|
foreach(connectionCell, connectionList)
|
||||||
|
{
|
||||||
|
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
|
||||||
|
SendCopyDataOverConnection(dataBuffer, connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SendCopyDataOverConnection sends serialized COPY data over the given
|
||||||
|
* connection.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
SendCopyDataOverConnection(StringInfo dataBuffer, MultiConnection *connection)
|
||||||
|
{
|
||||||
|
if (!PutRemoteCopyData(connection, dataBuffer->data, dataBuffer->len))
|
||||||
|
{
|
||||||
|
ReportConnectionError(connection, ERROR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteFileDestReceiverDestroy frees memory allocated as part of the
|
||||||
|
* RemoteFileDestReceiver and closes file descriptors.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RemoteFileDestReceiverDestroy(DestReceiver *destReceiver)
|
||||||
|
{
|
||||||
|
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver;
|
||||||
|
|
||||||
|
if (resultDest->copyOutState)
|
||||||
|
{
|
||||||
|
pfree(resultDest->copyOutState);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (resultDest->columnOutputFunctions)
|
||||||
|
{
|
||||||
|
pfree(resultDest->columnOutputFunctions);
|
||||||
|
}
|
||||||
|
|
||||||
|
pfree(resultDest);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReceiveQueryResultViaCopy is called when a COPY "resultid" FROM
|
||||||
|
* STDIN WITH (format result) command is received from the client.
|
||||||
|
* The command is followed by the raw copy data stream, which is
|
||||||
|
* redirected to a file.
|
||||||
|
*
|
||||||
|
* File names are automatically prefixed with the user OID. Users
|
||||||
|
* are only allowed to read query results from their own directory.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ReceiveQueryResultViaCopy(const char *resultId)
|
||||||
|
{
|
||||||
|
const char *resultFileName = NULL;
|
||||||
|
|
||||||
|
CreateIntermediateResultsDirectory();
|
||||||
|
|
||||||
|
resultFileName = QueryResultFileName(resultId);
|
||||||
|
|
||||||
|
RedirectCopyDataToRegularFile(resultFileName);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateIntermediateResultsDirectory creates the intermediate result
|
||||||
|
* directory for the current transaction if it does not exist and ensures
|
||||||
|
* that the directory is removed at the end of the transaction.
|
||||||
|
*/
|
||||||
|
static char *
|
||||||
|
CreateIntermediateResultsDirectory(void)
|
||||||
|
{
|
||||||
|
char *resultDirectory = IntermediateResultsDirectory();
|
||||||
|
int makeOK = 0;
|
||||||
|
|
||||||
|
if (!CreatedResultsDirectory)
|
||||||
|
{
|
||||||
|
makeOK = mkdir(resultDirectory, S_IRWXU);
|
||||||
|
if (makeOK != 0 && errno != EEXIST)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
errmsg("could not create intermediate results directory "
|
||||||
|
"\"%s\": %m",
|
||||||
|
resultDirectory)));
|
||||||
|
}
|
||||||
|
|
||||||
|
CreatedResultsDirectory = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return resultDirectory;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* QueryResultFileName returns the file name in which to store
|
||||||
|
* an intermediate result with the given key in the per transaction
|
||||||
|
* result directory.
|
||||||
|
*/
|
||||||
|
static char *
|
||||||
|
QueryResultFileName(const char *resultId)
|
||||||
|
{
|
||||||
|
StringInfo resultFileName = makeStringInfo();
|
||||||
|
const char *resultDirectory = IntermediateResultsDirectory();
|
||||||
|
char *checkChar = (char *) resultId;
|
||||||
|
|
||||||
|
for (; *checkChar; checkChar++)
|
||||||
|
{
|
||||||
|
if (!((*checkChar >= 'a' && *checkChar <= 'z') ||
|
||||||
|
(*checkChar >= 'A' && *checkChar <= 'Z') ||
|
||||||
|
(*checkChar >= '0' && *checkChar <= '9') ||
|
||||||
|
(*checkChar == '_') || (*checkChar == '-')))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_INVALID_NAME),
|
||||||
|
errmsg("result key \"%s\" contains invalid character",
|
||||||
|
resultId),
|
||||||
|
errhint("Result keys may only contain letters, numbers, "
|
||||||
|
"underscores and hyphens.")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfo(resultFileName, "%s/%s.data",
|
||||||
|
resultDirectory, resultId);
|
||||||
|
|
||||||
|
return resultFileName->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IntermediateResultsDirectory returns the directory to use for a query result
|
||||||
|
* file with a particular key. The filename includes the user OID, such
|
||||||
|
* that users can never read each other's files.
|
||||||
|
*
|
||||||
|
* In a distributed transaction, the directory has the form:
|
||||||
|
* base/pgsql_job_cache/<user id>_<coordinator node id>_<transaction number>/
|
||||||
|
*
|
||||||
|
* In a non-distributed transaction, the directory has the form:
|
||||||
|
* base/pgsql_job_cache/<user id>_<process id>/
|
||||||
|
*
|
||||||
|
* The latter form can be used for testing COPY ... WITH (format result) without
|
||||||
|
* assigning a distributed transaction ID.
|
||||||
|
*
|
||||||
|
* The pgsql_job_cache directory is emptied on restart in case of failure.
|
||||||
|
*/
|
||||||
|
static char *
|
||||||
|
IntermediateResultsDirectory(void)
|
||||||
|
{
|
||||||
|
StringInfo resultFileName = makeStringInfo();
|
||||||
|
Oid userId = GetUserId();
|
||||||
|
DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId();
|
||||||
|
int initiatorNodeIdentifier = transactionId->initiatorNodeIdentifier;
|
||||||
|
uint64 transactionNumber = transactionId->transactionNumber;
|
||||||
|
|
||||||
|
if (transactionNumber > 0)
|
||||||
|
{
|
||||||
|
appendStringInfo(resultFileName, "base/" PG_JOB_CACHE_DIR "/%u_%u_%lu",
|
||||||
|
userId, initiatorNodeIdentifier, transactionNumber);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(resultFileName, "base/" PG_JOB_CACHE_DIR "/%u_%u",
|
||||||
|
userId, MyProcPid);
|
||||||
|
}
|
||||||
|
|
||||||
|
return resultFileName->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoveIntermediateResultsDirectory removes the intermediate result directory
|
||||||
|
* for the current distributed transaction, if any was created.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
RemoveIntermediateResultsDirectory(void)
|
||||||
|
{
|
||||||
|
if (CreatedResultsDirectory)
|
||||||
|
{
|
||||||
|
StringInfo resultsDirectory = makeStringInfo();
|
||||||
|
appendStringInfoString(resultsDirectory, IntermediateResultsDirectory());
|
||||||
|
|
||||||
|
CitusRemoveDirectory(resultsDirectory);
|
||||||
|
|
||||||
|
CreatedResultsDirectory = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* read_intermediate_result is a UDF that returns a COPY-formatted intermediate
|
||||||
|
* result file as a set of records. The file is parsed according to the columns
|
||||||
|
* definition list specified by the user, e.g.:
|
||||||
|
*
|
||||||
|
* SELECT * FROM read_intermediate_result('foo', 'csv') AS (a int, b int)
|
||||||
|
*
|
||||||
|
* The file is read from the directory returned by IntermediateResultsDirectory,
|
||||||
|
* which includes the user ID.
|
||||||
|
*
|
||||||
|
* read_intermediate_result is a volatile function because it cannot be
|
||||||
|
* evaluated until execution time, but for distributed planning purposes we can
|
||||||
|
* treat it in the same way as immutable functions and reference tables, since
|
||||||
|
* we know it will return the same result on all nodes.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
read_intermediate_result(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||||
|
|
||||||
|
text *resultIdText = PG_GETARG_TEXT_P(0);
|
||||||
|
char *resultIdString = text_to_cstring(resultIdText);
|
||||||
|
Datum copyFormatOidDatum = PG_GETARG_DATUM(1);
|
||||||
|
Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum);
|
||||||
|
char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum);
|
||||||
|
|
||||||
|
char *resultFileName = NULL;
|
||||||
|
struct stat fileStat;
|
||||||
|
int statOK = 0;
|
||||||
|
|
||||||
|
Tuplestorestate *tupstore = NULL;
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
MemoryContext oldcontext = NULL;
|
||||||
|
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
resultFileName = QueryResultFileName(resultIdString);
|
||||||
|
statOK = stat(resultFileName, &fileStat);
|
||||||
|
if (statOK != 0)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
errmsg("result \"%s\" does not exist", resultIdString)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* check to see if query supports us returning a tuplestore */
|
||||||
|
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg(
|
||||||
|
"set-valued function called in context that cannot accept a set")));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(rsinfo->allowedModes & SFRM_Materialize))
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg(
|
||||||
|
"materialize mode required, but it is not allowed in this context")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* get a tuple descriptor for our result type */
|
||||||
|
switch (get_call_result_type(fcinfo, NULL, &tupleDescriptor))
|
||||||
|
{
|
||||||
|
case TYPEFUNC_COMPOSITE:
|
||||||
|
{
|
||||||
|
/* success */
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TYPEFUNC_RECORD:
|
||||||
|
{
|
||||||
|
/* failed to determine actual type of RECORD */
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("function returning record called in context "
|
||||||
|
"that cannot accept type record")));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
/* result type isn't composite */
|
||||||
|
elog(ERROR, "return type must be a row type");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tupleDescriptor = CreateTupleDescCopy(tupleDescriptor);
|
||||||
|
|
||||||
|
oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
|
||||||
|
|
||||||
|
tupstore = tuplestore_begin_heap(true, false, work_mem);
|
||||||
|
rsinfo->returnMode = SFRM_Materialize;
|
||||||
|
rsinfo->setResult = tupstore;
|
||||||
|
rsinfo->setDesc = tupleDescriptor;
|
||||||
|
MemoryContextSwitchTo(oldcontext);
|
||||||
|
|
||||||
|
ReadFileIntoTupleStore(resultFileName, copyFormatLabel, tupleDescriptor, tupstore);
|
||||||
|
|
||||||
|
tuplestore_donestoring(tupstore);
|
||||||
|
|
||||||
|
return (Datum) 0;
|
||||||
|
}
|
|
@ -34,6 +34,7 @@
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
#include "parser/parsetree.h"
|
#include "parser/parsetree.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
|
#include "tcop/pquery.h"
|
||||||
#include "tcop/utility.h"
|
#include "tcop/utility.h"
|
||||||
#include "utils/snapmgr.h"
|
#include "utils/snapmgr.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
|
@ -93,29 +94,13 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
|
||||||
{
|
{
|
||||||
CustomScanState customScanState = citusScanState->customScanState;
|
CustomScanState customScanState = citusScanState->customScanState;
|
||||||
List *workerTaskList = workerJob->taskList;
|
List *workerTaskList = workerJob->taskList;
|
||||||
List *copyOptions = NIL;
|
|
||||||
EState *executorState = NULL;
|
|
||||||
MemoryContext executorTupleContext = NULL;
|
|
||||||
ExprContext *executorExpressionContext = NULL;
|
|
||||||
TupleDesc tupleDescriptor = NULL;
|
TupleDesc tupleDescriptor = NULL;
|
||||||
Relation stubRelation = NULL;
|
|
||||||
ListCell *workerTaskCell = NULL;
|
ListCell *workerTaskCell = NULL;
|
||||||
uint32 columnCount = 0;
|
|
||||||
Datum *columnValues = NULL;
|
|
||||||
bool *columnNulls = NULL;
|
|
||||||
bool randomAccess = true;
|
bool randomAccess = true;
|
||||||
bool interTransactions = false;
|
bool interTransactions = false;
|
||||||
|
char *copyFormat = "text";
|
||||||
executorState = citusScanState->customScanState.ss.ps.state;
|
|
||||||
executorTupleContext = GetPerTupleMemoryContext(executorState);
|
|
||||||
executorExpressionContext = GetPerTupleExprContext(executorState);
|
|
||||||
|
|
||||||
tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
|
tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
|
||||||
stubRelation = StubRelation(tupleDescriptor);
|
|
||||||
|
|
||||||
columnCount = tupleDescriptor->natts;
|
|
||||||
columnValues = palloc0(columnCount * sizeof(Datum));
|
|
||||||
columnNulls = palloc0(columnCount * sizeof(bool));
|
|
||||||
|
|
||||||
Assert(citusScanState->tuplestorestate == NULL);
|
Assert(citusScanState->tuplestorestate == NULL);
|
||||||
citusScanState->tuplestorestate =
|
citusScanState->tuplestorestate =
|
||||||
|
@ -123,16 +108,7 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
|
||||||
|
|
||||||
if (BinaryMasterCopyFormat)
|
if (BinaryMasterCopyFormat)
|
||||||
{
|
{
|
||||||
DefElem *copyOption = NULL;
|
copyFormat = "binary";
|
||||||
|
|
||||||
#if (PG_VERSION_NUM >= 100000)
|
|
||||||
int location = -1; /* "unknown" token location */
|
|
||||||
copyOption = makeDefElem("format", (Node *) makeString("binary"), location);
|
|
||||||
#else
|
|
||||||
copyOption = makeDefElem("format", (Node *) makeString("binary"));
|
|
||||||
#endif
|
|
||||||
|
|
||||||
copyOptions = lappend(copyOptions, copyOption);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach(workerTaskCell, workerTaskList)
|
foreach(workerTaskCell, workerTaskList)
|
||||||
|
@ -140,42 +116,85 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
|
||||||
Task *workerTask = (Task *) lfirst(workerTaskCell);
|
Task *workerTask = (Task *) lfirst(workerTaskCell);
|
||||||
StringInfo jobDirectoryName = NULL;
|
StringInfo jobDirectoryName = NULL;
|
||||||
StringInfo taskFilename = NULL;
|
StringInfo taskFilename = NULL;
|
||||||
CopyState copyState = NULL;
|
|
||||||
|
|
||||||
jobDirectoryName = MasterJobDirectoryName(workerTask->jobId);
|
jobDirectoryName = MasterJobDirectoryName(workerTask->jobId);
|
||||||
taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId);
|
taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId);
|
||||||
|
|
||||||
|
ReadFileIntoTupleStore(taskFilename->data, copyFormat, tupleDescriptor,
|
||||||
|
citusScanState->tuplestorestate);
|
||||||
|
}
|
||||||
|
|
||||||
|
tuplestore_donestoring(citusScanState->tuplestorestate);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReadFileIntoTupleStore parses the records in a COPY-formatted file according
|
||||||
|
* according to the given tuple descriptor and stores the records in a tuple
|
||||||
|
* store.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescriptor,
|
||||||
|
Tuplestorestate *tupstore)
|
||||||
|
{
|
||||||
|
CopyState copyState = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Trick BeginCopyFrom into using our tuple descriptor by pretending it belongs
|
||||||
|
* to a relation.
|
||||||
|
*/
|
||||||
|
Relation stubRelation = StubRelation(tupleDescriptor);
|
||||||
|
|
||||||
|
EState *executorState = CreateExecutorState();
|
||||||
|
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
|
||||||
|
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);
|
||||||
|
|
||||||
|
int columnCount = tupleDescriptor->natts;
|
||||||
|
Datum *columnValues = palloc0(columnCount * sizeof(Datum));
|
||||||
|
bool *columnNulls = palloc0(columnCount * sizeof(bool));
|
||||||
|
|
||||||
|
DefElem *copyOption = NULL;
|
||||||
|
List *copyOptions = NIL;
|
||||||
|
|
||||||
#if (PG_VERSION_NUM >= 100000)
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
copyState = BeginCopyFrom(NULL, stubRelation, taskFilename->data, false, NULL,
|
int location = -1; /* "unknown" token location */
|
||||||
NULL, copyOptions);
|
copyOption = makeDefElem("format", (Node *) makeString(copyFormat), location);
|
||||||
#else
|
#else
|
||||||
copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL,
|
copyOption = makeDefElem("format", (Node *) makeString(copyFormat));
|
||||||
copyOptions);
|
#endif
|
||||||
|
copyOptions = lappend(copyOptions, copyOption);
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
copyState = BeginCopyFrom(NULL, stubRelation, fileName, false, NULL,
|
||||||
|
NULL, copyOptions);
|
||||||
|
#else
|
||||||
|
copyState = BeginCopyFrom(stubRelation, fileName, false, NULL,
|
||||||
|
copyOptions);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
|
{
|
||||||
|
MemoryContext oldContext = NULL;
|
||||||
|
bool nextRowFound = false;
|
||||||
|
|
||||||
|
ResetPerTupleExprContext(executorState);
|
||||||
|
oldContext = MemoryContextSwitchTo(executorTupleContext);
|
||||||
|
|
||||||
|
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
|
||||||
|
columnValues, columnNulls, NULL);
|
||||||
|
if (!nextRowFound)
|
||||||
{
|
{
|
||||||
MemoryContext oldContext = NULL;
|
|
||||||
bool nextRowFound = false;
|
|
||||||
|
|
||||||
ResetPerTupleExprContext(executorState);
|
|
||||||
oldContext = MemoryContextSwitchTo(executorTupleContext);
|
|
||||||
|
|
||||||
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
|
|
||||||
columnValues, columnNulls, NULL);
|
|
||||||
if (!nextRowFound)
|
|
||||||
{
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
tuplestore_putvalues(citusScanState->tuplestorestate, tupleDescriptor,
|
|
||||||
columnValues, columnNulls);
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
EndCopyFrom(copyState);
|
tuplestore_putvalues(tupstore, tupleDescriptor, columnValues, columnNulls);
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
EndCopyFrom(copyState);
|
||||||
|
pfree(columnValues);
|
||||||
|
pfree(columnNulls);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -195,3 +214,86 @@ StubRelation(TupleDesc tupleDescriptor)
|
||||||
|
|
||||||
return stubRelation;
|
return stubRelation;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteQueryStringIntoDestReceiver plans and executes a query and sends results
|
||||||
|
* to the given DestReceiver.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params,
|
||||||
|
DestReceiver *dest)
|
||||||
|
{
|
||||||
|
Query *query = NULL;
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
|
||||||
|
List *queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, NULL, 0, NULL);
|
||||||
|
#else
|
||||||
|
Node *queryTreeNode = ParseTreeNode(queryString);
|
||||||
|
List *queryTreeList = pg_analyze_and_rewrite(queryTreeNode, queryString, NULL, 0);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (list_length(queryTreeList) != 1)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("can only execute a single query")));
|
||||||
|
}
|
||||||
|
|
||||||
|
query = (Query *) linitial(queryTreeList);
|
||||||
|
|
||||||
|
ExecuteQueryIntoDestReceiver(query, params, dest);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteQueryIntoDestReceiver plans and executes a query and sends results to the given
|
||||||
|
* DestReceiver.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, DestReceiver *dest)
|
||||||
|
{
|
||||||
|
PlannedStmt *queryPlan = NULL;
|
||||||
|
int cursorOptions = 0;
|
||||||
|
|
||||||
|
cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
||||||
|
|
||||||
|
/* plan the subquery, this may be another distributed query */
|
||||||
|
queryPlan = pg_plan_query(query, cursorOptions, params);
|
||||||
|
|
||||||
|
ExecutePlanIntoDestReceiver(queryPlan, params, dest);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteIntoDestReceiver plans and executes a query and sends results to the given
|
||||||
|
* DestReceiver.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
|
||||||
|
DestReceiver *dest)
|
||||||
|
{
|
||||||
|
Portal portal = NULL;
|
||||||
|
int eflags = 0;
|
||||||
|
long count = FETCH_ALL;
|
||||||
|
|
||||||
|
/* create a new portal for executing the query */
|
||||||
|
portal = CreateNewPortal();
|
||||||
|
|
||||||
|
/* don't display the portal in pg_cursors, it is for internal use only */
|
||||||
|
portal->visible = false;
|
||||||
|
|
||||||
|
PortalDefineQuery(portal,
|
||||||
|
NULL,
|
||||||
|
"",
|
||||||
|
"SELECT",
|
||||||
|
list_make1(queryPlan),
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
PortalStart(portal, params, eflags, GetActiveSnapshot());
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
PortalRun(portal, count, false, true, dest, dest, NULL);
|
||||||
|
#else
|
||||||
|
PortalRun(portal, count, false, dest, dest, NULL);
|
||||||
|
#endif
|
||||||
|
PortalDrop(portal, false);
|
||||||
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@
|
||||||
#include "commands/prepare.h"
|
#include "commands/prepare.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
|
#include "distributed/intermediate_results.h"
|
||||||
#include "distributed/maintenanced.h"
|
#include "distributed/maintenanced.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
|
@ -108,6 +109,7 @@ static bool IsCitusExtensionStmt(Node *parsetree);
|
||||||
/* Local functions forward declarations for Transmit statement */
|
/* Local functions forward declarations for Transmit statement */
|
||||||
static bool IsTransmitStmt(Node *parsetree);
|
static bool IsTransmitStmt(Node *parsetree);
|
||||||
static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
||||||
|
static bool IsCopyResultStmt(CopyStmt *copyStatement);
|
||||||
|
|
||||||
/* Local functions forward declarations for processing distributed table commands */
|
/* Local functions forward declarations for processing distributed table commands */
|
||||||
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
||||||
|
@ -706,6 +708,34 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsCopyResultStmt determines whether the given copy statement is a
|
||||||
|
* COPY "resultkey" FROM STDIN WITH (format result) statement, which is used
|
||||||
|
* to copy query results from the coordinator into workers.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
IsCopyResultStmt(CopyStmt *copyStatement)
|
||||||
|
{
|
||||||
|
ListCell *optionCell = NULL;
|
||||||
|
bool hasFormatReceive = false;
|
||||||
|
|
||||||
|
/* extract WITH (...) options from the COPY statement */
|
||||||
|
foreach(optionCell, copyStatement->options)
|
||||||
|
{
|
||||||
|
DefElem *defel = (DefElem *) lfirst(optionCell);
|
||||||
|
|
||||||
|
if (strncmp(defel->defname, "format", NAMEDATALEN) == 0 &&
|
||||||
|
strncmp(defGetString(defel), "result", NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
hasFormatReceive = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return hasFormatReceive;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ProcessCopyStmt handles Citus specific concerns for COPY like supporting
|
* ProcessCopyStmt handles Citus specific concerns for COPY like supporting
|
||||||
* COPYing from distributed tables and preventing unsupported actions. The
|
* COPYing from distributed tables and preventing unsupported actions. The
|
||||||
|
@ -723,6 +753,19 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
|
||||||
{
|
{
|
||||||
*commandMustRunAsOwner = false; /* make sure variable is initialized */
|
*commandMustRunAsOwner = false; /* make sure variable is initialized */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Handle special COPY "resultid" FROM STDIN WITH (format result) commands
|
||||||
|
* for sending intermediate results to workers.
|
||||||
|
*/
|
||||||
|
if (IsCopyResultStmt(copyStatement))
|
||||||
|
{
|
||||||
|
const char *resultId = copyStatement->relation->relname;
|
||||||
|
|
||||||
|
ReceiveQueryResultViaCopy(resultId);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We check whether a distributed relation is affected. For that, we need to open the
|
* We check whether a distributed relation is affected. For that, we need to open the
|
||||||
* relation. To prevent race conditions with later lookups, lock the table, and modify
|
* relation. To prevent race conditions with later lookups, lock the table, and modify
|
||||||
|
|
|
@ -67,7 +67,8 @@ typedef enum RecurringTuplesType
|
||||||
RECURRING_TUPLES_INVALID = 0,
|
RECURRING_TUPLES_INVALID = 0,
|
||||||
RECURRING_TUPLES_REFERENCE_TABLE,
|
RECURRING_TUPLES_REFERENCE_TABLE,
|
||||||
RECURRING_TUPLES_FUNCTION,
|
RECURRING_TUPLES_FUNCTION,
|
||||||
RECURRING_TUPLES_EMPTY_JOIN_TREE
|
RECURRING_TUPLES_EMPTY_JOIN_TREE,
|
||||||
|
RECURRING_TUPLES_RESULT_FUNCTION
|
||||||
} RecurringTuplesType;
|
} RecurringTuplesType;
|
||||||
|
|
||||||
|
|
||||||
|
@ -129,6 +130,8 @@ static bool RelationInfoContainsRecurringTuples(PlannerInfo *plannerInfo,
|
||||||
RelOptInfo *relationInfo,
|
RelOptInfo *relationInfo,
|
||||||
RecurringTuplesType *recurType);
|
RecurringTuplesType *recurType);
|
||||||
static bool HasRecurringTuples(Node *node, RecurringTuplesType *recurType);
|
static bool HasRecurringTuples(Node *node, RecurringTuplesType *recurType);
|
||||||
|
static bool ContainsReadIntermediateResultFunction(Node *node);
|
||||||
|
static bool IsReadIntermediateResultFunction(Node *node);
|
||||||
static void ValidateClauseList(List *clauseList);
|
static void ValidateClauseList(List *clauseList);
|
||||||
static bool ExtractFromExpressionWalker(Node *node,
|
static bool ExtractFromExpressionWalker(Node *node,
|
||||||
QualifierWalkerContext *walkerContext);
|
QualifierWalkerContext *walkerContext);
|
||||||
|
@ -952,6 +955,14 @@ DeferErrorIfUnsupportedSublinkAndReferenceTable(Query *queryTree)
|
||||||
"clause when the query has subqueries in "
|
"clause when the query has subqueries in "
|
||||||
"WHERE clause", NULL);
|
"WHERE clause", NULL);
|
||||||
}
|
}
|
||||||
|
else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"cannot pushdown the subquery",
|
||||||
|
"Complex subqueries and CTEs are not allowed in "
|
||||||
|
"the FROM clause when the query has subqueries in the "
|
||||||
|
"WHERE clause", NULL);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
@ -1218,6 +1229,13 @@ DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree,
|
||||||
"Subqueries without a FROM clause are not supported with "
|
"Subqueries without a FROM clause are not supported with "
|
||||||
"union operator", NULL);
|
"union operator", NULL);
|
||||||
}
|
}
|
||||||
|
else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"cannot push down this subquery",
|
||||||
|
"Complex subqueries and CTEs are not supported within a "
|
||||||
|
"UNION", NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1297,7 +1315,18 @@ DeferErrorIfUnsupportedTableCombination(Query *queryTree)
|
||||||
}
|
}
|
||||||
else if (rangeTableEntry->rtekind == RTE_FUNCTION)
|
else if (rangeTableEntry->rtekind == RTE_FUNCTION)
|
||||||
{
|
{
|
||||||
if (contain_mutable_functions((Node *) rangeTableEntry->functions))
|
List *functionList = rangeTableEntry->functions;
|
||||||
|
|
||||||
|
if (list_length(functionList) == 1 &&
|
||||||
|
ContainsReadIntermediateResultFunction(linitial(functionList)))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The read_intermediate_result function is volatile, but we know
|
||||||
|
* it has the same result across all nodes and can therefore treat
|
||||||
|
* it as a reference table.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
else if (contain_mutable_functions((Node *) functionList))
|
||||||
{
|
{
|
||||||
unsupportedTableCombination = true;
|
unsupportedTableCombination = true;
|
||||||
errorDetail = "Only immutable functions can be used as a table "
|
errorDetail = "Only immutable functions can be used as a table "
|
||||||
|
@ -2003,7 +2032,13 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin(
|
||||||
"There exist a subquery without FROM in the outer "
|
"There exist a subquery without FROM in the outer "
|
||||||
"part of the outer join", NULL);
|
"part of the outer join", NULL);
|
||||||
}
|
}
|
||||||
|
else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"cannot pushdown the subquery",
|
||||||
|
"Complex subqueries and CTEs cannot be in the outer "
|
||||||
|
"part of the outer join", NULL);
|
||||||
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2141,7 +2176,17 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType)
|
||||||
}
|
}
|
||||||
else if (rangeTableEntry->rtekind == RTE_FUNCTION)
|
else if (rangeTableEntry->rtekind == RTE_FUNCTION)
|
||||||
{
|
{
|
||||||
*recurType = RECURRING_TUPLES_FUNCTION;
|
List *functionList = rangeTableEntry->functions;
|
||||||
|
|
||||||
|
if (list_length(functionList) == 1 &&
|
||||||
|
ContainsReadIntermediateResultFunction((Node *) functionList))
|
||||||
|
{
|
||||||
|
*recurType = RECURRING_TUPLES_RESULT_FUNCTION;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
*recurType = RECURRING_TUPLES_FUNCTION;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Tuples from functions will recur in every query on shards that includes
|
* Tuples from functions will recur in every query on shards that includes
|
||||||
|
@ -2176,6 +2221,38 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ContainsReadIntermediateResultFunction determines whether an expresion tree contains
|
||||||
|
* a call to the read_intermediate_results function.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ContainsReadIntermediateResultFunction(Node *node)
|
||||||
|
{
|
||||||
|
return FindNodeCheck(node, IsReadIntermediateResultFunction);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsReadIntermediateResultFunction determines whether a given node is a function call
|
||||||
|
* to the read_intermediate_result function.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
IsReadIntermediateResultFunction(Node *node)
|
||||||
|
{
|
||||||
|
if (IsA(node, FuncExpr))
|
||||||
|
{
|
||||||
|
FuncExpr *funcExpr = (FuncExpr *) node;
|
||||||
|
|
||||||
|
if (funcExpr->funcid == CitusReadIntermediateResultFuncId())
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ErrorIfQueryNotSupported checks that we can perform distributed planning for
|
* ErrorIfQueryNotSupported checks that we can perform distributed planning for
|
||||||
* the given query. The checks in this function will be removed as we support
|
* the given query. The checks in this function will be removed as we support
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "distributed/backend_data.h"
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/hash_helpers.h"
|
#include "distributed/hash_helpers.h"
|
||||||
|
#include "distributed/intermediate_results.h"
|
||||||
#include "distributed/multi_shard_transaction.h"
|
#include "distributed/multi_shard_transaction.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "distributed/placement_connection.h"
|
#include "distributed/placement_connection.h"
|
||||||
|
@ -154,6 +155,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
* transaction management. Do so before doing other work, so the
|
* transaction management. Do so before doing other work, so the
|
||||||
* callbacks still can perform work if needed.
|
* callbacks still can perform work if needed.
|
||||||
*/
|
*/
|
||||||
|
RemoveIntermediateResultsDirectory();
|
||||||
ResetShardPlacementTransactionState();
|
ResetShardPlacementTransactionState();
|
||||||
|
|
||||||
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
|
if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
|
||||||
|
@ -191,6 +193,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
* transaction management. Do so before doing other work, so the
|
* transaction management. Do so before doing other work, so the
|
||||||
* callbacks still can perform work if needed.
|
* callbacks still can perform work if needed.
|
||||||
*/
|
*/
|
||||||
|
RemoveIntermediateResultsDirectory();
|
||||||
ResetShardPlacementTransactionState();
|
ResetShardPlacementTransactionState();
|
||||||
|
|
||||||
/* handles both already prepared and open transactions */
|
/* handles both already prepared and open transactions */
|
||||||
|
|
|
@ -117,6 +117,7 @@ typedef struct MetadataCacheData
|
||||||
Oid distTransactionRelationId;
|
Oid distTransactionRelationId;
|
||||||
Oid distTransactionGroupIndexId;
|
Oid distTransactionGroupIndexId;
|
||||||
Oid distTransactionRecordIndexId;
|
Oid distTransactionRecordIndexId;
|
||||||
|
Oid readIntermediateResultFuncId;
|
||||||
Oid extraDataContainerFuncId;
|
Oid extraDataContainerFuncId;
|
||||||
Oid workerHashFunctionId;
|
Oid workerHashFunctionId;
|
||||||
Oid extensionOwner;
|
Oid extensionOwner;
|
||||||
|
@ -1834,6 +1835,31 @@ DistPlacementGroupidIndexId(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* return oid of the read_intermediate_result(text,citus.copy_format) function */
|
||||||
|
Oid
|
||||||
|
CitusReadIntermediateResultFuncId(void)
|
||||||
|
{
|
||||||
|
if (MetadataCache.readIntermediateResultFuncId == InvalidOid)
|
||||||
|
{
|
||||||
|
bool missingOK = false;
|
||||||
|
|
||||||
|
List *copyFormatTypeNameList = list_make2(makeString("citus"),
|
||||||
|
makeString("copy_format"));
|
||||||
|
TypeName *copyFormatTypeName = makeTypeNameFromNameList(copyFormatTypeNameList);
|
||||||
|
Oid copyFormatTypeOid = LookupTypeNameOid(NULL, copyFormatTypeName, missingOK);
|
||||||
|
|
||||||
|
List *functionNameList = list_make2(makeString("pg_catalog"),
|
||||||
|
makeString("read_intermediate_result"));
|
||||||
|
Oid paramOids[2] = { TEXTOID, copyFormatTypeOid };
|
||||||
|
|
||||||
|
MetadataCache.readIntermediateResultFuncId =
|
||||||
|
LookupFuncName(functionNameList, 2, paramOids, missingOK);
|
||||||
|
}
|
||||||
|
|
||||||
|
return MetadataCache.readIntermediateResultFuncId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* return oid of the citus_extradata_container(internal) function */
|
/* return oid of the citus_extradata_container(internal) function */
|
||||||
Oid
|
Oid
|
||||||
CitusExtraDataContainerFuncId(void)
|
CitusExtraDataContainerFuncId(void)
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* intermediate_results.h
|
||||||
|
* Functions for writing and reading intermediate results.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2017, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef INTERMEDIATE_RESULTS_H
|
||||||
|
#define INTERMEDIATE_RESULTS_H
|
||||||
|
|
||||||
|
|
||||||
|
#include "fmgr.h"
|
||||||
|
|
||||||
|
#include "distributed/multi_copy.h"
|
||||||
|
#include "nodes/execnodes.h"
|
||||||
|
#include "nodes/execnodes.h"
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
|
#include "tcop/dest.h"
|
||||||
|
#include "utils/palloc.h"
|
||||||
|
|
||||||
|
|
||||||
|
extern void ReceiveQueryResultViaCopy(const char *resultId);
|
||||||
|
extern void RemoveIntermediateResultsDirectory(void);
|
||||||
|
|
||||||
|
|
||||||
|
#endif /* INTERMEDIATE_RESULTS_H */
|
|
@ -124,6 +124,7 @@ extern Oid DistTransactionRecordIndexId(void);
|
||||||
extern Oid DistPlacementGroupidIndexId(void);
|
extern Oid DistPlacementGroupidIndexId(void);
|
||||||
|
|
||||||
/* function oids */
|
/* function oids */
|
||||||
|
extern Oid CitusReadIntermediateResultFuncId(void);
|
||||||
extern Oid CitusExtraDataContainerFuncId(void);
|
extern Oid CitusExtraDataContainerFuncId(void);
|
||||||
extern Oid CitusWorkerHashFunctionId(void);
|
extern Oid CitusWorkerHashFunctionId(void);
|
||||||
|
|
||||||
|
|
|
@ -115,6 +115,7 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
|
||||||
EState *executorState,
|
EState *executorState,
|
||||||
bool stopOnFailure);
|
bool stopOnFailure);
|
||||||
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||||
|
extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);
|
||||||
extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
|
extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
|
||||||
TupleDesc rowDescriptor,
|
TupleDesc rowDescriptor,
|
||||||
CopyOutState rowOutputState,
|
CopyOutState rowOutputState,
|
||||||
|
@ -122,6 +123,7 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
|
||||||
CopyCoercionData *columnCoercionPaths);
|
CopyCoercionData *columnCoercionPaths);
|
||||||
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
|
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
|
||||||
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
|
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
|
||||||
|
extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure);
|
||||||
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
||||||
extern bool IsCopyFromWorker(CopyStmt *copyStatement);
|
extern bool IsCopyFromWorker(CopyStmt *copyStatement);
|
||||||
extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
|
extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
|
||||||
|
|
|
@ -30,6 +30,15 @@ extern int MultiShardConnectionType;
|
||||||
|
|
||||||
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
|
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
|
||||||
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
|
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
|
||||||
|
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
|
||||||
|
tupleDescriptor, Tuplestorestate *tupstore);
|
||||||
|
extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo
|
||||||
|
params,
|
||||||
|
DestReceiver *dest);
|
||||||
|
extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params,
|
||||||
|
DestReceiver *dest);
|
||||||
|
extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params,
|
||||||
|
DestReceiver *dest);
|
||||||
|
|
||||||
|
|
||||||
#endif /* MULTI_EXECUTOR_H */
|
#endif /* MULTI_EXECUTOR_H */
|
||||||
|
|
|
@ -12,11 +12,13 @@
|
||||||
#define TRANSMIT_H
|
#define TRANSMIT_H
|
||||||
|
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
|
#include "storage/fd.h"
|
||||||
|
|
||||||
|
|
||||||
/* Function declarations for transmitting files between two nodes */
|
/* Function declarations for transmitting files between two nodes */
|
||||||
extern void RedirectCopyDataToRegularFile(const char *filename);
|
extern void RedirectCopyDataToRegularFile(const char *filename);
|
||||||
extern void SendRegularFile(const char *filename);
|
extern void SendRegularFile(const char *filename);
|
||||||
|
extern File FileOpenForTransmit(const char *filename, int fileFlags, int fileMode);
|
||||||
|
|
||||||
/* Function declaration local to commands and worker modules */
|
/* Function declaration local to commands and worker modules */
|
||||||
extern void FreeStringInfo(StringInfo stringInfo);
|
extern void FreeStringInfo(StringInfo stringInfo);
|
||||||
|
|
|
@ -0,0 +1,209 @@
|
||||||
|
-- Test functions for copying intermediate results
|
||||||
|
CREATE SCHEMA intermediate_results;
|
||||||
|
SET search_path TO 'intermediate_results';
|
||||||
|
-- in the same transaction we can read a result
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
create_intermediate_result
|
||||||
|
----------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int);
|
||||||
|
x | x2
|
||||||
|
---+----
|
||||||
|
1 | 1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
4 | 16
|
||||||
|
5 | 25
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- in separate transactions, the result is no longer available
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
create_intermediate_result
|
||||||
|
----------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int);
|
||||||
|
ERROR: result "squares" does not exist
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE interesting_squares (user_id text, interested_in text);
|
||||||
|
SELECT create_distributed_table('interesting_squares', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO interesting_squares VALUES ('jon', '2'), ('jon', '5'), ('jack', '3');
|
||||||
|
-- put an intermediate result on all workers
|
||||||
|
SELECT broadcast_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
broadcast_intermediate_result
|
||||||
|
-------------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- query the intermediate result in a router query
|
||||||
|
SELECT x, x2
|
||||||
|
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int)) squares ON (x::text = interested_in)
|
||||||
|
WHERE user_id = 'jon'
|
||||||
|
ORDER BY x;
|
||||||
|
x | x2
|
||||||
|
---+----
|
||||||
|
2 | 4
|
||||||
|
5 | 25
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
END;
|
||||||
|
BEGIN;
|
||||||
|
-- put an intermediate result on all workers
|
||||||
|
SELECT broadcast_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
broadcast_intermediate_result
|
||||||
|
-------------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- query the intermediate result in a distributed query
|
||||||
|
SELECT x, x2
|
||||||
|
FROM interesting_squares
|
||||||
|
JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int)) squares ON (x::text = interested_in)
|
||||||
|
ORDER BY x;
|
||||||
|
x | x2
|
||||||
|
---+----
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
5 | 25
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
END;
|
||||||
|
-- files should now be cleaned up
|
||||||
|
SELECT x, x2
|
||||||
|
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in)
|
||||||
|
WHERE user_id = 'jon'
|
||||||
|
ORDER BY x;
|
||||||
|
WARNING: result "squares" does not exist
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
|
WARNING: result "squares" does not exist
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
ERROR: could not receive query results
|
||||||
|
-- try to read the file as text, will fail because of binary encoding
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
create_intermediate_result
|
||||||
|
----------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int);
|
||||||
|
ERROR: invalid byte sequence for encoding "UTF8": 0x00
|
||||||
|
END;
|
||||||
|
-- try to read the file with wrong encoding
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
create_intermediate_result
|
||||||
|
----------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'csv') AS res (x int, x2 int);
|
||||||
|
ERROR: invalid input syntax for integer: "PGCOPY"
|
||||||
|
END;
|
||||||
|
-- try a composite type
|
||||||
|
CREATE TYPE intermediate_results.square_type AS (x text, x2 int);
|
||||||
|
SELECT run_command_on_workers('CREATE TYPE intermediate_results.square_type AS (x text, x2 int)');
|
||||||
|
run_command_on_workers
|
||||||
|
-----------------------------------
|
||||||
|
(localhost,57637,t,"CREATE TYPE")
|
||||||
|
(localhost,57638,t,"CREATE TYPE")
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
CREATE TABLE stored_squares (user_id text, square intermediate_results.square_type, metadata jsonb);
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(2,4)'::intermediate_results.square_type, '{"value":2}');
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(3,9)'::intermediate_results.square_type, '{"value":3}');
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(4,16)'::intermediate_results.square_type, '{"value":4}');
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(5,25)'::intermediate_results.square_type, '{"value":5}');
|
||||||
|
-- composite types change the format to text
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares');
|
||||||
|
create_intermediate_result
|
||||||
|
----------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM read_intermediate_result('stored_squares', 'binary') AS res (s intermediate_results.square_type);
|
||||||
|
ERROR: COPY file signature not recognized
|
||||||
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares');
|
||||||
|
create_intermediate_result
|
||||||
|
----------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type);
|
||||||
|
s
|
||||||
|
--------
|
||||||
|
(2,4)
|
||||||
|
(3,9)
|
||||||
|
(4,16)
|
||||||
|
(5,25)
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
-- put an intermediate result in text format on all workers
|
||||||
|
SELECT broadcast_intermediate_result('stored_squares', 'SELECT square, metadata FROM stored_squares');
|
||||||
|
broadcast_intermediate_result
|
||||||
|
-------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- query the intermediate result in a router query using text format
|
||||||
|
SELECT * FROM interesting_squares JOIN (
|
||||||
|
SELECT * FROM
|
||||||
|
read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type, m jsonb)
|
||||||
|
) squares
|
||||||
|
ON ((s).x = interested_in) WHERE user_id = 'jon' ORDER BY 1,2;
|
||||||
|
user_id | interested_in | s | m
|
||||||
|
---------+---------------+--------+--------------
|
||||||
|
jon | 2 | (2,4) | {"value": 2}
|
||||||
|
jon | 5 | (5,25) | {"value": 5}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- query the intermediate result in a real-time query using text format
|
||||||
|
SELECT * FROM interesting_squares JOIN (
|
||||||
|
SELECT * FROM
|
||||||
|
read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type, m jsonb)
|
||||||
|
) squares
|
||||||
|
ON ((s).x = interested_in) ORDER BY 1,2;
|
||||||
|
user_id | interested_in | s | m
|
||||||
|
---------+---------------+--------+--------------
|
||||||
|
jack | 3 | (3,9) | {"value": 3}
|
||||||
|
jon | 2 | (2,4) | {"value": 2}
|
||||||
|
jon | 5 | (5,25) | {"value": 5}
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
END;
|
||||||
|
-- pipe query output into a result file and create a table to check the result
|
||||||
|
COPY (SELECT s, s*s FROM generate_series(1,5) s)
|
||||||
|
TO PROGRAM
|
||||||
|
$$psql -h localhost -p 57636 -U postgres -d regression -c "BEGIN; COPY squares FROM STDIN WITH (format result); CREATE TABLE intermediate_results.squares AS SELECT * FROM read_intermediate_result('squares', 'binary') AS res(x int, x2 int); END;"$$
|
||||||
|
WITH (FORMAT binary);
|
||||||
|
SELECT * FROM squares ORDER BY x;
|
||||||
|
x | x2
|
||||||
|
---+----
|
||||||
|
1 | 1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
4 | 16
|
||||||
|
5 | 25
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
DROP SCHEMA intermediate_results CASCADE;
|
||||||
|
NOTICE: drop cascades to 4 other objects
|
||||||
|
DETAIL: drop cascades to table interesting_squares
|
||||||
|
drop cascades to type square_type
|
||||||
|
drop cascades to table stored_squares
|
||||||
|
drop cascades to table squares
|
|
@ -129,6 +129,8 @@ ALTER EXTENSION citus UPDATE TO '7.1-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.1-2';
|
ALTER EXTENSION citus UPDATE TO '7.1-2';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.1-3';
|
ALTER EXTENSION citus UPDATE TO '7.1-3';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.1-4';
|
ALTER EXTENSION citus UPDATE TO '7.1-4';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '7.2-1';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '7.2-2';
|
||||||
-- show running version
|
-- show running version
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
citus.version
|
citus.version
|
||||||
|
|
|
@ -41,7 +41,7 @@ test: multi_partitioning_utils multi_partitioning
|
||||||
# ----------
|
# ----------
|
||||||
# Miscellaneous tests to check our query planning behavior
|
# Miscellaneous tests to check our query planning behavior
|
||||||
# ----------
|
# ----------
|
||||||
test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction
|
test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction intermediate_results
|
||||||
test: multi_explain
|
test: multi_explain
|
||||||
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
||||||
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
|
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
|
||||||
|
|
|
@ -0,0 +1,108 @@
|
||||||
|
-- Test functions for copying intermediate results
|
||||||
|
CREATE SCHEMA intermediate_results;
|
||||||
|
SET search_path TO 'intermediate_results';
|
||||||
|
|
||||||
|
-- in the same transaction we can read a result
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int);
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- in separate transactions, the result is no longer available
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int);
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE interesting_squares (user_id text, interested_in text);
|
||||||
|
SELECT create_distributed_table('interesting_squares', 'user_id');
|
||||||
|
INSERT INTO interesting_squares VALUES ('jon', '2'), ('jon', '5'), ('jack', '3');
|
||||||
|
|
||||||
|
-- put an intermediate result on all workers
|
||||||
|
SELECT broadcast_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
|
||||||
|
-- query the intermediate result in a router query
|
||||||
|
SELECT x, x2
|
||||||
|
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int)) squares ON (x::text = interested_in)
|
||||||
|
WHERE user_id = 'jon'
|
||||||
|
ORDER BY x;
|
||||||
|
|
||||||
|
END;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- put an intermediate result on all workers
|
||||||
|
SELECT broadcast_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
-- query the intermediate result in a distributed query
|
||||||
|
SELECT x, x2
|
||||||
|
FROM interesting_squares
|
||||||
|
JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int)) squares ON (x::text = interested_in)
|
||||||
|
ORDER BY x;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- files should now be cleaned up
|
||||||
|
SELECT x, x2
|
||||||
|
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in)
|
||||||
|
WHERE user_id = 'jon'
|
||||||
|
ORDER BY x;
|
||||||
|
|
||||||
|
-- try to read the file as text, will fail because of binary encoding
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int);
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- try to read the file with wrong encoding
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
|
||||||
|
SELECT * FROM read_intermediate_result('squares', 'csv') AS res (x int, x2 int);
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- try a composite type
|
||||||
|
CREATE TYPE intermediate_results.square_type AS (x text, x2 int);
|
||||||
|
SELECT run_command_on_workers('CREATE TYPE intermediate_results.square_type AS (x text, x2 int)');
|
||||||
|
|
||||||
|
CREATE TABLE stored_squares (user_id text, square intermediate_results.square_type, metadata jsonb);
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(2,4)'::intermediate_results.square_type, '{"value":2}');
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(3,9)'::intermediate_results.square_type, '{"value":3}');
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(4,16)'::intermediate_results.square_type, '{"value":4}');
|
||||||
|
INSERT INTO stored_squares VALUES ('jon', '(5,25)'::intermediate_results.square_type, '{"value":5}');
|
||||||
|
|
||||||
|
-- composite types change the format to text
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares');
|
||||||
|
SELECT * FROM read_intermediate_result('stored_squares', 'binary') AS res (s intermediate_results.square_type);
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares');
|
||||||
|
SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type);
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- put an intermediate result in text format on all workers
|
||||||
|
SELECT broadcast_intermediate_result('stored_squares', 'SELECT square, metadata FROM stored_squares');
|
||||||
|
|
||||||
|
-- query the intermediate result in a router query using text format
|
||||||
|
SELECT * FROM interesting_squares JOIN (
|
||||||
|
SELECT * FROM
|
||||||
|
read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type, m jsonb)
|
||||||
|
) squares
|
||||||
|
ON ((s).x = interested_in) WHERE user_id = 'jon' ORDER BY 1,2;
|
||||||
|
|
||||||
|
-- query the intermediate result in a real-time query using text format
|
||||||
|
SELECT * FROM interesting_squares JOIN (
|
||||||
|
SELECT * FROM
|
||||||
|
read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type, m jsonb)
|
||||||
|
) squares
|
||||||
|
ON ((s).x = interested_in) ORDER BY 1,2;
|
||||||
|
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- pipe query output into a result file and create a table to check the result
|
||||||
|
COPY (SELECT s, s*s FROM generate_series(1,5) s)
|
||||||
|
TO PROGRAM
|
||||||
|
$$psql -h localhost -p 57636 -U postgres -d regression -c "BEGIN; COPY squares FROM STDIN WITH (format result); CREATE TABLE intermediate_results.squares AS SELECT * FROM read_intermediate_result('squares', 'binary') AS res(x int, x2 int); END;"$$
|
||||||
|
WITH (FORMAT binary);
|
||||||
|
|
||||||
|
SELECT * FROM squares ORDER BY x;
|
||||||
|
|
||||||
|
DROP SCHEMA intermediate_results CASCADE;
|
|
@ -129,6 +129,8 @@ ALTER EXTENSION citus UPDATE TO '7.1-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.1-2';
|
ALTER EXTENSION citus UPDATE TO '7.1-2';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.1-3';
|
ALTER EXTENSION citus UPDATE TO '7.1-3';
|
||||||
ALTER EXTENSION citus UPDATE TO '7.1-4';
|
ALTER EXTENSION citus UPDATE TO '7.1-4';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '7.2-1';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '7.2-2';
|
||||||
|
|
||||||
-- show running version
|
-- show running version
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
|
|
Loading…
Reference in New Issue