Merge pull request #3304 from citusdata/read_intermediate_results

Implement read_intermediate_results
pull/3319/head
Hadi Moshayedi 2019-12-17 14:08:39 -08:00 committed by GitHub
commit e96201c609
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 505 additions and 80 deletions

View File

@ -92,10 +92,14 @@ static void RemoteFileDestReceiverDestroy(DestReceiver *destReceiver);
static char * CreateIntermediateResultsDirectory(void); static char * CreateIntermediateResultsDirectory(void);
static char * IntermediateResultsDirectory(void); static char * IntermediateResultsDirectory(void);
static char * QueryResultFileName(const char *resultId); static char * QueryResultFileName(const char *resultId);
static void ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo,
char *copyFormat,
Datum *resultIdArray,
int resultCount);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(read_intermediate_result); PG_FUNCTION_INFO_V1(read_intermediate_result);
PG_FUNCTION_INFO_V1(read_intermediate_result_array);
PG_FUNCTION_INFO_V1(broadcast_intermediate_result); PG_FUNCTION_INFO_V1(broadcast_intermediate_result);
PG_FUNCTION_INFO_V1(create_intermediate_result); PG_FUNCTION_INFO_V1(create_intermediate_result);
@ -693,31 +697,75 @@ IntermediateResultSize(char *resultId)
Datum Datum
read_intermediate_result(PG_FUNCTION_ARGS) read_intermediate_result(PG_FUNCTION_ARGS)
{ {
text *resultIdText = PG_GETARG_TEXT_P(0); Datum resultId = PG_GETARG_DATUM(0);
char *resultIdString = text_to_cstring(resultIdText);
Datum copyFormatOidDatum = PG_GETARG_DATUM(1); Datum copyFormatOidDatum = PG_GETARG_DATUM(1);
Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum);
char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum);
struct stat fileStat; CheckCitusVersion(ERROR);
TupleDesc tupleDescriptor = NULL; ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel, &resultId, 1);
PG_RETURN_DATUM(0);
}
/*
* read_intermediate_result_array returns the set of records in a set of given
* COPY-formatted intermediate result files.
*
* The usage and semantics of this is same as read_intermediate_result(), except
* that its first argument is an array of result ids.
*/
Datum
read_intermediate_result_array(PG_FUNCTION_ARGS)
{
ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0);
Datum copyFormatOidDatum = PG_GETARG_DATUM(1);
Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum);
char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum);
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
char *resultFileName = QueryResultFileName(resultIdString); int32 resultCount = ArrayGetNItems(ARR_NDIM(resultIdObject), ARR_DIMS(
int statOK = stat(resultFileName, &fileStat); resultIdObject));
if (statOK != 0) Datum *resultIdArray = DeconstructArrayObject(resultIdObject);
ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel,
resultIdArray, resultCount);
PG_RETURN_DATUM(0);
}
/*
* ReadIntermediateResultsIntoFuncOutput reads the given result files and stores
* them at the function's output tuple store. Errors out if any of the result files
* don't exist.
*/
static void
ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat,
Datum *resultIdArray, int resultCount)
{
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
for (int resultIndex = 0; resultIndex < resultCount; resultIndex++)
{ {
ereport(ERROR, (errcode_for_file_access(), char *resultId = TextDatumGetCString(resultIdArray[resultIndex]);
errmsg("result \"%s\" does not exist", resultIdString))); char *resultFileName = QueryResultFileName(resultId);
struct stat fileStat;
int statOK = stat(resultFileName, &fileStat);
if (statOK != 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("result \"%s\" does not exist", resultId)));
}
ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, tupleStore);
} }
Tuplestorestate *tupstore = SetupTuplestore(fcinfo, &tupleDescriptor); tuplestore_donestoring(tupleStore);
ReadFileIntoTupleStore(resultFileName, copyFormatLabel, tupleDescriptor, tupstore);
tuplestore_donestoring(tupstore);
return (Datum) 0;
} }

View File

@ -136,6 +136,7 @@ typedef struct MetadataCacheData
Oid citusCatalogNamespaceId; Oid citusCatalogNamespaceId;
Oid copyFormatTypeId; Oid copyFormatTypeId;
Oid readIntermediateResultFuncId; Oid readIntermediateResultFuncId;
Oid readIntermediateResultArrayFuncId;
Oid extraDataContainerFuncId; Oid extraDataContainerFuncId;
Oid workerHashFunctionId; Oid workerHashFunctionId;
Oid anyValueFunctionId; Oid anyValueFunctionId;
@ -2065,6 +2066,26 @@ CitusReadIntermediateResultFuncId(void)
} }
/* return oid of the read_intermediate_results(text[],citus_copy_format) function */
Oid
CitusReadIntermediateResultArrayFuncId(void)
{
if (MetadataCache.readIntermediateResultArrayFuncId == InvalidOid)
{
List *functionNameList = list_make2(makeString("pg_catalog"),
makeString("read_intermediate_results"));
Oid copyFormatTypeOid = CitusCopyFormatTypeId();
Oid paramOids[2] = { TEXTARRAYOID, copyFormatTypeOid };
bool missingOK = false;
MetadataCache.readIntermediateResultArrayFuncId =
LookupFuncName(functionNameList, 2, paramOids, missingOK);
}
return MetadataCache.readIntermediateResultArrayFuncId;
}
/* return oid of the citus.copy_format enum type */ /* return oid of the citus.copy_format enum type */
Oid Oid
CitusCopyFormatTypeId(void) CitusCopyFormatTypeId(void)

View File

@ -99,6 +99,13 @@ static void CheckNodeIsDumpable(Node *node);
static Node * CheckNodeCopyAndSerialization(Node *node); static Node * CheckNodeCopyAndSerialization(Node *node);
static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry,
RelOptInfo *relOptInfo); RelOptInfo *relOptInfo);
static void AdjustReadIntermediateResultArrayCost(RangeTblEntry *rangeTableEntry,
RelOptInfo *relOptInfo);
static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo,
List *columnTypes,
int resultIdCount,
Datum *resultIds,
Const *resultFormatConst);
static List * OuterPlanParamsList(PlannerInfo *root); static List * OuterPlanParamsList(PlannerInfo *root);
static List * CopyPlanParamList(List *originalPlanParamList); static List * CopyPlanParamList(List *originalPlanParamList);
static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void); static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void);
@ -1517,6 +1524,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
DistTableCacheEntry *cacheEntry = NULL; DistTableCacheEntry *cacheEntry = NULL;
AdjustReadIntermediateResultCost(rte, relOptInfo); AdjustReadIntermediateResultCost(rte, relOptInfo);
AdjustReadIntermediateResultArrayCost(rte, relOptInfo);
if (rte->rtekind != RTE_RELATION) if (rte->rtekind != RTE_RELATION)
{ {
@ -1578,30 +1586,6 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
static void static void
AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *relOptInfo) AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *relOptInfo)
{ {
PathTarget *reltarget = relOptInfo->reltarget;
List *pathList = relOptInfo->pathlist;
Path *path = NULL;
RangeTblFunction *rangeTableFunction = NULL;
FuncExpr *funcExpression = NULL;
Const *resultFormatConst = NULL;
Datum resultFormatDatum = 0;
Oid resultFormatId = InvalidOid;
Const *resultIdConst = NULL;
Datum resultIdDatum = 0;
char *resultId = NULL;
int64 resultSize = 0;
ListCell *typeCell = NULL;
bool binaryFormat = false;
double rowCost = 0.;
double rowSizeEstimate = 0;
double rowCountEstimate = 0.;
double ioCost = 0.;
#if PG_VERSION_NUM >= 120000
QualCost funcCost = { 0., 0. };
#else
double funcCost = 0.;
#endif
if (rangeTableEntry->rtekind != RTE_FUNCTION || if (rangeTableEntry->rtekind != RTE_FUNCTION ||
list_length(rangeTableEntry->functions) != 1) list_length(rangeTableEntry->functions) != 1)
{ {
@ -1620,41 +1604,133 @@ AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *rel
return; return;
} }
rangeTableFunction = (RangeTblFunction *) linitial(rangeTableEntry->functions); RangeTblFunction *rangeTableFunction = (RangeTblFunction *) linitial(
funcExpression = (FuncExpr *) rangeTableFunction->funcexpr; rangeTableEntry->functions);
resultIdConst = (Const *) linitial(funcExpression->args); FuncExpr *funcExpression = (FuncExpr *) rangeTableFunction->funcexpr;
Const *resultIdConst = (Const *) linitial(funcExpression->args);
if (!IsA(resultIdConst, Const)) if (!IsA(resultIdConst, Const))
{ {
/* not sure how to interpret non-const */ /* not sure how to interpret non-const */
return; return;
} }
resultIdDatum = resultIdConst->constvalue; Datum resultIdDatum = resultIdConst->constvalue;
resultId = TextDatumGetCString(resultIdDatum);
resultSize = IntermediateResultSize(resultId); Const *resultFormatConst = (Const *) lsecond(funcExpression->args);
if (resultSize < 0)
{
/* result does not exist, will probably error out later on */
return;
}
resultFormatConst = (Const *) lsecond(funcExpression->args);
if (!IsA(resultFormatConst, Const)) if (!IsA(resultFormatConst, Const))
{ {
/* not sure how to interpret non-const */ /* not sure how to interpret non-const */
return; return;
} }
resultFormatDatum = resultFormatConst->constvalue; AdjustReadIntermediateResultsCostInternal(relOptInfo,
resultFormatId = DatumGetObjectId(resultFormatDatum); rangeTableFunction->funccoltypes,
1, &resultIdDatum, resultFormatConst);
}
if (resultFormatId == BinaryCopyFormatId())
/*
* AdjustReadIntermediateResultArrayCost adjusts the row count and total cost
* of a read_intermediate_results(resultIds, format) call based on the file size.
*/
static void
AdjustReadIntermediateResultArrayCost(RangeTblEntry *rangeTableEntry,
RelOptInfo *relOptInfo)
{
Datum *resultIdArray = NULL;
int resultIdCount = 0;
if (rangeTableEntry->rtekind != RTE_FUNCTION ||
list_length(rangeTableEntry->functions) != 1)
{ {
binaryFormat = true; /* avoid more expensive checks below for non-functions */
return;
}
/* subtract 11-byte signature + 8 byte header + 2-byte footer */ if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG5))
resultSize -= 21; {
/* read_intermediate_result may not exist */
return;
}
if (!ContainsReadIntermediateResultArrayFunction((Node *) rangeTableEntry->functions))
{
return;
}
RangeTblFunction *rangeTableFunction =
(RangeTblFunction *) linitial(rangeTableEntry->functions);
FuncExpr *funcExpression = (FuncExpr *) rangeTableFunction->funcexpr;
Const *resultIdConst = (Const *) linitial(funcExpression->args);
if (!IsA(resultIdConst, Const))
{
/* not sure how to interpret non-const */
return;
}
Datum resultIdArrayDatum = resultIdConst->constvalue;
deconstruct_array(DatumGetArrayTypeP(resultIdArrayDatum), TEXTOID, -1, false,
'i', &resultIdArray, NULL, &resultIdCount);
Const *resultFormatConst = (Const *) lsecond(funcExpression->args);
if (!IsA(resultFormatConst, Const))
{
/* not sure how to interpret non-const */
return;
}
AdjustReadIntermediateResultsCostInternal(relOptInfo,
rangeTableFunction->funccoltypes,
resultIdCount, resultIdArray,
resultFormatConst);
}
/*
* AdjustReadIntermediateResultsCostInternal adjusts the row count and total cost
* of reading intermediate results based on file sizes.
*/
static void
AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo, List *columnTypes,
int resultIdCount, Datum *resultIds,
Const *resultFormatConst)
{
PathTarget *reltarget = relOptInfo->reltarget;
List *pathList = relOptInfo->pathlist;
Path *path = NULL;
double rowCost = 0.;
double rowSizeEstimate = 0;
double rowCountEstimate = 0.;
double ioCost = 0.;
#if PG_VERSION_NUM >= 120000
QualCost funcCost = { 0., 0. };
#else
double funcCost = 0.;
#endif
int64 totalResultSize = 0;
ListCell *typeCell = NULL;
Datum resultFormatDatum = resultFormatConst->constvalue;
Oid resultFormatId = DatumGetObjectId(resultFormatDatum);
bool binaryFormat = (resultFormatId == BinaryCopyFormatId());
for (int index = 0; index < resultIdCount; index++)
{
char *resultId = TextDatumGetCString(resultIds[index]);
int64 resultSize = IntermediateResultSize(resultId);
if (resultSize < 0)
{
/* result does not exist, will probably error out later on */
return;
}
if (binaryFormat)
{
/* subtract 11-byte signature + 8 byte header + 2-byte footer */
totalResultSize -= 21;
}
totalResultSize += resultSize;
} }
/* start with the cost of evaluating quals */ /* start with the cost of evaluating quals */
@ -1666,7 +1742,7 @@ AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *rel
/* add 2 bytes for column count (binary) or line separator (text) */ /* add 2 bytes for column count (binary) or line separator (text) */
rowSizeEstimate += 2; rowSizeEstimate += 2;
foreach(typeCell, rangeTableFunction->funccoltypes) foreach(typeCell, columnTypes)
{ {
Oid columnTypeId = lfirst_oid(typeCell); Oid columnTypeId = lfirst_oid(typeCell);
Oid inputFunctionId = InvalidOid; Oid inputFunctionId = InvalidOid;
@ -1702,10 +1778,10 @@ AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *rel
#endif #endif
/* estimate the number of rows based on the file size and estimated row size */ /* estimate the number of rows based on the file size and estimated row size */
rowCountEstimate = Max(1, (double) resultSize / rowSizeEstimate); rowCountEstimate = Max(1, (double) totalResultSize / rowSizeEstimate);
/* cost of reading the data */ /* cost of reading the data */
ioCost = seq_page_cost * resultSize / BLCKSZ; ioCost = seq_page_cost * totalResultSize / BLCKSZ;
Assert(pathList != NIL); Assert(pathList != NIL);

View File

@ -78,6 +78,8 @@ static bool ErrorHintRequired(const char *errorHint, Query *queryTree);
static bool HasTablesample(Query *queryTree); static bool HasTablesample(Query *queryTree);
static bool HasComplexRangeTableType(Query *queryTree); static bool HasComplexRangeTableType(Query *queryTree);
static bool IsReadIntermediateResultFunction(Node *node); static bool IsReadIntermediateResultFunction(Node *node);
static bool IsReadIntermediateResultArrayFunction(Node *node);
static bool IsFunctionWithOid(Node *node, Oid funcOid);
static bool ExtractFromExpressionWalker(Node *node, static bool ExtractFromExpressionWalker(Node *node,
QualifierWalkerContext *walkerContext); QualifierWalkerContext *walkerContext);
static List * MultiTableNodeList(List *tableEntryList, List *rangeTableList); static List * MultiTableNodeList(List *tableEntryList, List *rangeTableList);
@ -768,18 +770,52 @@ ContainsReadIntermediateResultFunction(Node *node)
} }
/*
* ContainsReadIntermediateResultArrayFunction determines whether an expresion
* tree contains a call to the read_intermediate_results(result_ids, format)
* function.
*/
bool
ContainsReadIntermediateResultArrayFunction(Node *node)
{
return FindNodeCheck(node, IsReadIntermediateResultArrayFunction);
}
/* /*
* IsReadIntermediateResultFunction determines whether a given node is a function call * IsReadIntermediateResultFunction determines whether a given node is a function call
* to the read_intermediate_result function. * to the read_intermediate_result function.
*/ */
static bool static bool
IsReadIntermediateResultFunction(Node *node) IsReadIntermediateResultFunction(Node *node)
{
return IsFunctionWithOid(node, CitusReadIntermediateResultFuncId());
}
/*
* IsReadIntermediateResultArrayFunction determines whether a given node is a
* function call to the read_intermediate_results(result_ids, format) function.
*/
static bool
IsReadIntermediateResultArrayFunction(Node *node)
{
return IsFunctionWithOid(node, CitusReadIntermediateResultArrayFuncId());
}
/*
* IsFunctionWithOid determines whether a given node is a function call
* to the read_intermediate_result function.
*/
static bool
IsFunctionWithOid(Node *node, Oid funcOid)
{ {
if (IsA(node, FuncExpr)) if (IsA(node, FuncExpr))
{ {
FuncExpr *funcExpr = (FuncExpr *) node; FuncExpr *funcExpr = (FuncExpr *) node;
if (funcExpr->funcid == CitusReadIntermediateResultFuncId()) if (funcExpr->funcid == funcOid)
{ {
return true; return true;
} }

View File

@ -1,3 +1,5 @@
#include "udfs/read_intermediate_results/9.2-1.sql"
ALTER TABLE pg_catalog.pg_dist_colocation ADD distributioncolumncollation oid; ALTER TABLE pg_catalog.pg_dist_colocation ADD distributioncolumncollation oid;
UPDATE pg_catalog.pg_dist_colocation dc SET distributioncolumncollation = t.typcollation UPDATE pg_catalog.pg_dist_colocation dc SET distributioncolumncollation = t.typcollation
FROM pg_catalog.pg_type t WHERE t.oid = dc.distributioncolumntype; FROM pg_catalog.pg_type t WHERE t.oid = dc.distributioncolumntype;

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.read_intermediate_results(
result_ids text[],
format pg_catalog.citus_copy_format default 'csv')
RETURNS SETOF record
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
AS 'MODULE_PATHNAME', $$read_intermediate_result_array$$;
COMMENT ON FUNCTION pg_catalog.read_intermediate_results(text[],pg_catalog.citus_copy_format)
IS 'read a set files and return them as a set of records';

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.read_intermediate_results(
result_ids text[],
format pg_catalog.citus_copy_format default 'csv')
RETURNS SETOF record
LANGUAGE C STRICT VOLATILE PARALLEL SAFE
AS 'MODULE_PATHNAME', $$read_intermediate_result_array$$;
COMMENT ON FUNCTION pg_catalog.read_intermediate_results(text[],pg_catalog.citus_copy_format)
IS 'read a set files and return them as a set of records';

View File

@ -188,6 +188,7 @@ extern Oid CitusCopyFormatTypeId(void);
/* function oids */ /* function oids */
extern Oid CitusReadIntermediateResultFuncId(void); extern Oid CitusReadIntermediateResultFuncId(void);
Oid CitusReadIntermediateResultArrayFuncId(void);
extern Oid CitusExtraDataContainerFuncId(void); extern Oid CitusExtraDataContainerFuncId(void);
extern Oid CitusWorkerHashFunctionId(void); extern Oid CitusWorkerHashFunctionId(void);
extern Oid CitusAnyValueFunctionId(void); extern Oid CitusAnyValueFunctionId(void);

View File

@ -193,6 +193,7 @@ extern bool FindNodeCheckInRangeTableList(List *rtable, bool (*check)(Node *));
extern bool IsDistributedTableRTE(Node *node); extern bool IsDistributedTableRTE(Node *node);
extern bool QueryContainsDistributedTableRTE(Query *query); extern bool QueryContainsDistributedTableRTE(Query *query);
extern bool ContainsReadIntermediateResultFunction(Node *node); extern bool ContainsReadIntermediateResultFunction(Node *node);
extern bool ContainsReadIntermediateResultArrayFunction(Node *node);
extern char * FindIntermediateResultIdIfExists(RangeTblEntry *rte); extern char * FindIntermediateResultIdIfExists(RangeTblEntry *rte);
extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ParentNode(MultiNode *multiNode);
extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode);

View File

@ -122,7 +122,7 @@ SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series
(1 row) (1 row)
SELECT * FROM read_intermediate_result('squares', 'csv') AS res (x int, x2 int); SELECT * FROM read_intermediate_result('squares', 'csv') AS res (x int, x2 int);
ERROR: invalid input syntax for integer: "PGCOPY" ERROR: invalid input syntax for type integer: "PGCOPY"
END; END;
-- try a composite type -- try a composite type
CREATE TYPE intermediate_results.square_type AS (x text, x2 int); CREATE TYPE intermediate_results.square_type AS (x text, x2 int);
@ -201,10 +201,10 @@ SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series
632 632
(1 row) (1 row)
EXPLAIN (COSTS OFF) SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int);
QUERY PLAN QUERY PLAN
----------------------------------------------- -----------------------------------------------------------------------------------
Function Scan on read_intermediate_result res Function Scan on read_intermediate_result res (cost=0.00..4.55 rows=632 width=8)
(1 row) (1 row)
-- less accurate results for variable types -- less accurate results for variable types
@ -214,10 +214,10 @@ SELECT create_intermediate_result('hellos', $$SELECT s, 'hello-'||s FROM generat
63 63
(1 row) (1 row)
EXPLAIN (COSTS OFF) SELECT * FROM read_intermediate_result('hellos', 'binary') AS res (x int, y text); EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_result('hellos', 'binary') AS res (x int, y text);
QUERY PLAN QUERY PLAN
----------------------------------------------- -----------------------------------------------------------------------------------
Function Scan on read_intermediate_result res Function Scan on read_intermediate_result res (cost=0.00..0.32 rows=30 width=36)
(1 row) (1 row)
-- not very accurate results for text encoding -- not very accurate results for text encoding
@ -227,10 +227,10 @@ SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_s
4 4
(1 row) (1 row)
EXPLAIN (COSTS OFF) SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type); EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type);
QUERY PLAN QUERY PLAN
----------------------------------------------- ----------------------------------------------------------------------------------
Function Scan on read_intermediate_result res Function Scan on read_intermediate_result res (cost=0.00..0.01 rows=1 width=32)
(1 row) (1 row)
END; END;
@ -259,6 +259,160 @@ select broadcast_intermediate_result('a', 'prepare foo as select 1');
ERROR: cannot execute utility commands ERROR: cannot execute utility commands
select create_intermediate_result('a', 'create table foo(int serial)'); select create_intermediate_result('a', 'create table foo(int serial)');
ERROR: cannot execute utility commands ERROR: cannot execute utility commands
--
-- read_intermediate_results
--
BEGIN;
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s'),
create_intermediate_result('squares_2', 'SELECT s, s*s FROM generate_series(4,6) s'),
create_intermediate_result('squares_3', 'SELECT s, s*s FROM generate_series(7,10) s');
create_intermediate_result | create_intermediate_result | create_intermediate_result
----------------------------+----------------------------+----------------------------
3 | 3 | 4
(1 row)
SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res (x int, x2 int);
count
-------
0
(1 row)
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
x | x2
---+----
1 | 1
2 | 4
3 | 9
(3 rows)
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2', 'squares_3']::text[], 'binary') AS res (x int, x2 int);
x | x2
----+-----
1 | 1
2 | 4
3 | 9
4 | 16
5 | 25
6 | 36
7 | 49
8 | 64
9 | 81
10 | 100
(10 rows)
COMMIT;
-- in separate transactions, the result is no longer available
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,5) s');
create_intermediate_result
----------------------------
5
(1 row)
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist
-- error behaviour, and also check that results are deleted on rollback
BEGIN;
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s');
create_intermediate_result
----------------------------
3
(1 row)
SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['notexistingfile', 'squares_1'], 'binary') AS res (x int, x2 int);
ERROR: result "notexistingfile" does not exist
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'notexistingfile'], 'binary') AS res (x int, x2 int);
ERROR: result "notexistingfile" does not exist
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['squares_1', NULL], 'binary') AS res (x int, x2 int);
ERROR: null array element not allowed in this context
ROLLBACK TO SAVEPOINT s1;
-- after rollbacks we should be able to run vail read_intermediate_results still.
SELECT count(*) FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
count
-------
3
(1 row)
SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res (x int, x2 int);
count
-------
0
(1 row)
END;
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist
-- Test non-binary format: read_intermediate_results(..., 'text')
BEGIN;
-- ROW(...) types switch the output format to text
SELECT broadcast_intermediate_result('stored_squares_1',
'SELECT s, s*s, ROW(1::text, 2) FROM generate_series(1,3) s'),
broadcast_intermediate_result('stored_squares_2',
'SELECT s, s*s, ROW(2::text, 3) FROM generate_series(4,6) s');
broadcast_intermediate_result | broadcast_intermediate_result
-------------------------------+-------------------------------
3 | 3
(1 row)
-- query the intermediate result in a router query using text format
SELECT * FROM interesting_squares JOIN (
SELECT * FROM
read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'binary') AS res (x int, x2 int, z intermediate_results.square_type)
) squares
ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2;
user_id | interested_in | x | x2 | z
---------+---------------+---+----+-------
jon | 2 | 2 | 4 | (1,2)
jon | 5 | 5 | 25 | (2,3)
(2 rows)
END;
-- Cost estimation for read_intermediate_results
BEGIN;
-- almost accurate row count estimates for primitive types
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,632) s'),
create_intermediate_result('squares_2', 'SELECT s, s*s FROM generate_series(633,1024) s');
create_intermediate_result | create_intermediate_result
----------------------------+----------------------------
632 | 392
(1 row)
EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2'], 'binary') AS res (x int, x2 int);
QUERY PLAN
-------------------------------------------------------------------------------------
Function Scan on read_intermediate_results res (cost=0.00..7.37 rows=1024 width=8)
(1 row)
-- less accurate results for variable types
SELECT create_intermediate_result('hellos_1', $$SELECT s, 'hello-'||s FROM generate_series(1,63) s$$),
create_intermediate_result('hellos_2', $$SELECT s, 'hello-'||s FROM generate_series(64,129) s$$);
create_intermediate_result | create_intermediate_result
----------------------------+----------------------------
63 | 66
(1 row)
EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['hellos_1', 'hellos_2'], 'binary') AS res (x int, y text);
QUERY PLAN
------------------------------------------------------------------------------------
Function Scan on read_intermediate_results res (cost=0.00..0.66 rows=62 width=36)
(1 row)
-- not very accurate results for text encoding
SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares');
create_intermediate_result
----------------------------
4
(1 row)
EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['stored_squares'], 'text') AS res (s intermediate_results.square_type);
QUERY PLAN
-----------------------------------------------------------------------------------
Function Scan on read_intermediate_results res (cost=0.00..0.01 rows=1 width=32)
(1 row)
END;
DROP SCHEMA intermediate_results CASCADE; DROP SCHEMA intermediate_results CASCADE;
NOTICE: drop cascades to 5 other objects NOTICE: drop cascades to 5 other objects
DETAIL: drop cascades to table interesting_squares DETAIL: drop cascades to table interesting_squares

View File

@ -121,15 +121,15 @@ END;
BEGIN; BEGIN;
-- accurate row count estimates for primitive types -- accurate row count estimates for primitive types
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,632) s'); SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,632) s');
EXPLAIN (COSTS OFF) SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int);
-- less accurate results for variable types -- less accurate results for variable types
SELECT create_intermediate_result('hellos', $$SELECT s, 'hello-'||s FROM generate_series(1,63) s$$); SELECT create_intermediate_result('hellos', $$SELECT s, 'hello-'||s FROM generate_series(1,63) s$$);
EXPLAIN (COSTS OFF) SELECT * FROM read_intermediate_result('hellos', 'binary') AS res (x int, y text); EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_result('hellos', 'binary') AS res (x int, y text);
-- not very accurate results for text encoding -- not very accurate results for text encoding
SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares'); SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares');
EXPLAIN (COSTS OFF) SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type); EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type);
END; END;
-- pipe query output into a result file and create a table to check the result -- pipe query output into a result file and create a table to check the result
@ -148,4 +148,74 @@ select broadcast_intermediate_result('a', 'create table foo(int serial)');
select broadcast_intermediate_result('a', 'prepare foo as select 1'); select broadcast_intermediate_result('a', 'prepare foo as select 1');
select create_intermediate_result('a', 'create table foo(int serial)'); select create_intermediate_result('a', 'create table foo(int serial)');
--
-- read_intermediate_results
--
BEGIN;
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s'),
create_intermediate_result('squares_2', 'SELECT s, s*s FROM generate_series(4,6) s'),
create_intermediate_result('squares_3', 'SELECT s, s*s FROM generate_series(7,10) s');
SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res (x int, x2 int);
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2', 'squares_3']::text[], 'binary') AS res (x int, x2 int);
COMMIT;
-- in separate transactions, the result is no longer available
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,5) s');
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
-- error behaviour, and also check that results are deleted on rollback
BEGIN;
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s');
SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['notexistingfile', 'squares_1'], 'binary') AS res (x int, x2 int);
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'notexistingfile'], 'binary') AS res (x int, x2 int);
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['squares_1', NULL], 'binary') AS res (x int, x2 int);
ROLLBACK TO SAVEPOINT s1;
-- after rollbacks we should be able to run vail read_intermediate_results still.
SELECT count(*) FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res (x int, x2 int);
END;
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
-- Test non-binary format: read_intermediate_results(..., 'text')
BEGIN;
-- ROW(...) types switch the output format to text
SELECT broadcast_intermediate_result('stored_squares_1',
'SELECT s, s*s, ROW(1::text, 2) FROM generate_series(1,3) s'),
broadcast_intermediate_result('stored_squares_2',
'SELECT s, s*s, ROW(2::text, 3) FROM generate_series(4,6) s');
-- query the intermediate result in a router query using text format
SELECT * FROM interesting_squares JOIN (
SELECT * FROM
read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'binary') AS res (x int, x2 int, z intermediate_results.square_type)
) squares
ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2;
END;
-- Cost estimation for read_intermediate_results
BEGIN;
-- almost accurate row count estimates for primitive types
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,632) s'),
create_intermediate_result('squares_2', 'SELECT s, s*s FROM generate_series(633,1024) s');
EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2'], 'binary') AS res (x int, x2 int);
-- less accurate results for variable types
SELECT create_intermediate_result('hellos_1', $$SELECT s, 'hello-'||s FROM generate_series(1,63) s$$),
create_intermediate_result('hellos_2', $$SELECT s, 'hello-'||s FROM generate_series(64,129) s$$);
EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['hellos_1', 'hellos_2'], 'binary') AS res (x int, y text);
-- not very accurate results for text encoding
SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares');
EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['stored_squares'], 'text') AS res (s intermediate_results.square_type);
END;
DROP SCHEMA intermediate_results CASCADE; DROP SCHEMA intermediate_results CASCADE;