From 113bd1e5f11eb92838dd6d2f3547a9667cf9e926 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Fri, 13 Dec 2019 15:17:27 -0800 Subject: [PATCH 1/2] Implement read_intermediate_results --- .../executor/intermediate_results.c | 82 ++++++++++--- .../distributed/sql/citus--9.1-1--9.2-1.sql | 2 + .../udfs/read_intermediate_results/9.2-1.sql | 8 ++ .../udfs/read_intermediate_results/latest.sql | 8 ++ .../regress/expected/intermediate_results.out | 112 +++++++++++++++++- src/test/regress/sql/intermediate_results.sql | 54 +++++++++ 6 files changed, 248 insertions(+), 18 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/read_intermediate_results/9.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/read_intermediate_results/latest.sql diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index cdcc53005..688eef466 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -92,10 +92,14 @@ static void RemoteFileDestReceiverDestroy(DestReceiver *destReceiver); static char * CreateIntermediateResultsDirectory(void); static char * IntermediateResultsDirectory(void); static char * QueryResultFileName(const char *resultId); - +static void ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, + char *copyFormat, + Datum *resultIdArray, + int resultCount); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(read_intermediate_result); +PG_FUNCTION_INFO_V1(read_intermediate_result_array); PG_FUNCTION_INFO_V1(broadcast_intermediate_result); PG_FUNCTION_INFO_V1(create_intermediate_result); @@ -693,31 +697,75 @@ IntermediateResultSize(char *resultId) Datum read_intermediate_result(PG_FUNCTION_ARGS) { - text *resultIdText = PG_GETARG_TEXT_P(0); - char *resultIdString = text_to_cstring(resultIdText); + Datum resultId = PG_GETARG_DATUM(0); Datum copyFormatOidDatum = PG_GETARG_DATUM(1); Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); - struct stat fileStat; + CheckCitusVersion(ERROR); - TupleDesc tupleDescriptor = NULL; + ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel, &resultId, 1); + + PG_RETURN_DATUM(0); +} + + +/* + * read_intermediate_result_array returns the set of records in a set of given + * COPY-formatted intermediate result files. + * + * The usage and semantics of this is same as read_intermediate_result(), except + * that its first argument is an array of result ids. + */ +Datum +read_intermediate_result_array(PG_FUNCTION_ARGS) +{ + ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0); + Datum copyFormatOidDatum = PG_GETARG_DATUM(1); + + Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum); + char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum); CheckCitusVersion(ERROR); - char *resultFileName = QueryResultFileName(resultIdString); - int statOK = stat(resultFileName, &fileStat); - if (statOK != 0) + int32 resultCount = ArrayGetNItems(ARR_NDIM(resultIdObject), ARR_DIMS( + resultIdObject)); + Datum *resultIdArray = DeconstructArrayObject(resultIdObject); + + ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel, + resultIdArray, resultCount); + + PG_RETURN_DATUM(0); +} + + +/* + * ReadIntermediateResultsIntoFuncOutput reads the given result files and stores + * them at the function's output tuple store. Errors out if any of the result files + * don't exist. + */ +static void +ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, + Datum *resultIdArray, int resultCount) +{ + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + + for (int resultIndex = 0; resultIndex < resultCount; resultIndex++) { - ereport(ERROR, (errcode_for_file_access(), - errmsg("result \"%s\" does not exist", resultIdString))); + char *resultId = TextDatumGetCString(resultIdArray[resultIndex]); + char *resultFileName = QueryResultFileName(resultId); + struct stat fileStat; + + int statOK = stat(resultFileName, &fileStat); + if (statOK != 0) + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("result \"%s\" does not exist", resultId))); + } + + ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, tupleStore); } - Tuplestorestate *tupstore = SetupTuplestore(fcinfo, &tupleDescriptor); - - ReadFileIntoTupleStore(resultFileName, copyFormatLabel, tupleDescriptor, tupstore); - - tuplestore_donestoring(tupstore); - - return (Datum) 0; + tuplestore_donestoring(tupleStore); } diff --git a/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql b/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql index 70dfb74b5..4149f26f8 100644 --- a/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql +++ b/src/backend/distributed/sql/citus--9.1-1--9.2-1.sql @@ -1,3 +1,5 @@ +#include "udfs/read_intermediate_results/9.2-1.sql" + ALTER TABLE pg_catalog.pg_dist_colocation ADD distributioncolumncollation oid; UPDATE pg_catalog.pg_dist_colocation dc SET distributioncolumncollation = t.typcollation FROM pg_catalog.pg_type t WHERE t.oid = dc.distributioncolumntype; diff --git a/src/backend/distributed/sql/udfs/read_intermediate_results/9.2-1.sql b/src/backend/distributed/sql/udfs/read_intermediate_results/9.2-1.sql new file mode 100644 index 000000000..15e01ac64 --- /dev/null +++ b/src/backend/distributed/sql/udfs/read_intermediate_results/9.2-1.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.read_intermediate_results( + result_ids text[], + format pg_catalog.citus_copy_format default 'csv') +RETURNS SETOF record +LANGUAGE C STRICT VOLATILE PARALLEL SAFE +AS 'MODULE_PATHNAME', $$read_intermediate_result_array$$; +COMMENT ON FUNCTION pg_catalog.read_intermediate_results(text[],pg_catalog.citus_copy_format) +IS 'read a set files and return them as a set of records'; diff --git a/src/backend/distributed/sql/udfs/read_intermediate_results/latest.sql b/src/backend/distributed/sql/udfs/read_intermediate_results/latest.sql new file mode 100644 index 000000000..15e01ac64 --- /dev/null +++ b/src/backend/distributed/sql/udfs/read_intermediate_results/latest.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.read_intermediate_results( + result_ids text[], + format pg_catalog.citus_copy_format default 'csv') +RETURNS SETOF record +LANGUAGE C STRICT VOLATILE PARALLEL SAFE +AS 'MODULE_PATHNAME', $$read_intermediate_result_array$$; +COMMENT ON FUNCTION pg_catalog.read_intermediate_results(text[],pg_catalog.citus_copy_format) +IS 'read a set files and return them as a set of records'; diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out index eb14e86d6..93dcc8e69 100644 --- a/src/test/regress/expected/intermediate_results.out +++ b/src/test/regress/expected/intermediate_results.out @@ -122,7 +122,7 @@ SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series (1 row) SELECT * FROM read_intermediate_result('squares', 'csv') AS res (x int, x2 int); -ERROR: invalid input syntax for integer: "PGCOPY" +ERROR: invalid input syntax for type integer: "PGCOPY" END; -- try a composite type CREATE TYPE intermediate_results.square_type AS (x text, x2 int); @@ -259,6 +259,116 @@ select broadcast_intermediate_result('a', 'prepare foo as select 1'); ERROR: cannot execute utility commands select create_intermediate_result('a', 'create table foo(int serial)'); ERROR: cannot execute utility commands +-- +-- read_intermediate_results +-- +BEGIN; +SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s'), + create_intermediate_result('squares_2', 'SELECT s, s*s FROM generate_series(4,6) s'), + create_intermediate_result('squares_3', 'SELECT s, s*s FROM generate_series(7,10) s'); + create_intermediate_result | create_intermediate_result | create_intermediate_result +----------------------------+----------------------------+---------------------------- + 3 | 3 | 4 +(1 row) + +SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res (x int, x2 int); + count +------- + 0 +(1 row) + +SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); + x | x2 +---+---- + 1 | 1 + 2 | 4 + 3 | 9 +(3 rows) + +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2', 'squares_3']::text[], 'binary') AS res (x int, x2 int); + x | x2 +----+----- + 1 | 1 + 2 | 4 + 3 | 9 + 4 | 16 + 5 | 25 + 6 | 36 + 7 | 49 + 8 | 64 + 9 | 81 + 10 | 100 +(10 rows) + +COMMIT; +-- in separate transactions, the result is no longer available +SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,5) s'); + create_intermediate_result +---------------------------- + 5 +(1 row) + +SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); +ERROR: result "squares_1" does not exist +-- error behaviour, and also check that results are deleted on rollback +BEGIN; +SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s'); + create_intermediate_result +---------------------------- + 3 +(1 row) + +SAVEPOINT s1; +SELECT * FROM read_intermediate_results(ARRAY['notexistingfile', 'squares_1'], 'binary') AS res (x int, x2 int); +ERROR: result "notexistingfile" does not exist +ROLLBACK TO SAVEPOINT s1; +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'notexistingfile'], 'binary') AS res (x int, x2 int); +ERROR: result "notexistingfile" does not exist +ROLLBACK TO SAVEPOINT s1; +SELECT * FROM read_intermediate_results(ARRAY['squares_1', NULL], 'binary') AS res (x int, x2 int); +ERROR: null array element not allowed in this context +ROLLBACK TO SAVEPOINT s1; +-- after rollbacks we should be able to run vail read_intermediate_results still. +SELECT count(*) FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); + count +------- + 3 +(1 row) + +SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res (x int, x2 int); + count +------- + 0 +(1 row) + +END; +SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); +ERROR: result "squares_1" does not exist +-- Test non-binary format: read_intermediate_results(..., 'text') +BEGIN; +-- ROW(...) types switch the output format to text +SELECT broadcast_intermediate_result('stored_squares_1', + 'SELECT s, s*s, ROW(1::text, 2) FROM generate_series(1,3) s'), + broadcast_intermediate_result('stored_squares_2', + 'SELECT s, s*s, ROW(2::text, 3) FROM generate_series(4,6) s'); + broadcast_intermediate_result | broadcast_intermediate_result +-------------------------------+------------------------------- + 3 | 3 +(1 row) + +-- query the intermediate result in a router query using text format +SELECT * FROM interesting_squares JOIN ( + SELECT * FROM + read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'binary') AS res (x int, x2 int, z intermediate_results.square_type) +) squares +ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2; + user_id | interested_in | x | x2 | z +---------+---------------+---+----+------- + jon | 2 | 2 | 4 | (1,2) + jon | 5 | 5 | 25 | (2,3) +(2 rows) + +END; DROP SCHEMA intermediate_results CASCADE; NOTICE: drop cascades to 5 other objects DETAIL: drop cascades to table interesting_squares diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql index 777d5dc9b..bed8c7901 100644 --- a/src/test/regress/sql/intermediate_results.sql +++ b/src/test/regress/sql/intermediate_results.sql @@ -148,4 +148,58 @@ select broadcast_intermediate_result('a', 'create table foo(int serial)'); select broadcast_intermediate_result('a', 'prepare foo as select 1'); select create_intermediate_result('a', 'create table foo(int serial)'); +-- +-- read_intermediate_results +-- + +BEGIN; +SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s'), + create_intermediate_result('squares_2', 'SELECT s, s*s FROM generate_series(4,6) s'), + create_intermediate_result('squares_3', 'SELECT s, s*s FROM generate_series(7,10) s'); + +SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res (x int, x2 int); +SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2', 'squares_3']::text[], 'binary') AS res (x int, x2 int); + +COMMIT; + +-- in separate transactions, the result is no longer available +SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,5) s'); +SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); + +-- error behaviour, and also check that results are deleted on rollback +BEGIN; +SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s'); +SAVEPOINT s1; +SELECT * FROM read_intermediate_results(ARRAY['notexistingfile', 'squares_1'], 'binary') AS res (x int, x2 int); +ROLLBACK TO SAVEPOINT s1; +SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'notexistingfile'], 'binary') AS res (x int, x2 int); +ROLLBACK TO SAVEPOINT s1; +SELECT * FROM read_intermediate_results(ARRAY['squares_1', NULL], 'binary') AS res (x int, x2 int); +ROLLBACK TO SAVEPOINT s1; +-- after rollbacks we should be able to run vail read_intermediate_results still. +SELECT count(*) FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); +SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res (x int, x2 int); +END; + +SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); + +-- Test non-binary format: read_intermediate_results(..., 'text') +BEGIN; +-- ROW(...) types switch the output format to text +SELECT broadcast_intermediate_result('stored_squares_1', + 'SELECT s, s*s, ROW(1::text, 2) FROM generate_series(1,3) s'), + broadcast_intermediate_result('stored_squares_2', + 'SELECT s, s*s, ROW(2::text, 3) FROM generate_series(4,6) s'); + +-- query the intermediate result in a router query using text format +SELECT * FROM interesting_squares JOIN ( + SELECT * FROM + read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'binary') AS res (x int, x2 int, z intermediate_results.square_type) +) squares +ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2; + +END; + + DROP SCHEMA intermediate_results CASCADE; From 249508d2671d19fa21872dac7897fb574cd340f8 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 16 Dec 2019 15:35:04 -0800 Subject: [PATCH 2/2] Estimate cost of read_intermediate_results() --- .../distributed/metadata/metadata_cache.c | 21 +++ .../distributed/planner/distributed_planner.c | 168 +++++++++++++----- .../planner/multi_logical_planner.c | 38 +++- src/include/distributed/metadata_cache.h | 1 + .../distributed/multi_logical_planner.h | 1 + .../regress/expected/intermediate_results.out | 68 +++++-- src/test/regress/sql/intermediate_results.sql | 22 ++- 7 files changed, 257 insertions(+), 62 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index ceb7d0748..5613fe801 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -136,6 +136,7 @@ typedef struct MetadataCacheData Oid citusCatalogNamespaceId; Oid copyFormatTypeId; Oid readIntermediateResultFuncId; + Oid readIntermediateResultArrayFuncId; Oid extraDataContainerFuncId; Oid workerHashFunctionId; Oid anyValueFunctionId; @@ -2065,6 +2066,26 @@ CitusReadIntermediateResultFuncId(void) } +/* return oid of the read_intermediate_results(text[],citus_copy_format) function */ +Oid +CitusReadIntermediateResultArrayFuncId(void) +{ + if (MetadataCache.readIntermediateResultArrayFuncId == InvalidOid) + { + List *functionNameList = list_make2(makeString("pg_catalog"), + makeString("read_intermediate_results")); + Oid copyFormatTypeOid = CitusCopyFormatTypeId(); + Oid paramOids[2] = { TEXTARRAYOID, copyFormatTypeOid }; + bool missingOK = false; + + MetadataCache.readIntermediateResultArrayFuncId = + LookupFuncName(functionNameList, 2, paramOids, missingOK); + } + + return MetadataCache.readIntermediateResultArrayFuncId; +} + + /* return oid of the citus.copy_format enum type */ Oid CitusCopyFormatTypeId(void) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 4dfeedbd3..07641f272 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -99,6 +99,13 @@ static void CheckNodeIsDumpable(Node *node); static Node * CheckNodeCopyAndSerialization(Node *node); static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *relOptInfo); +static void AdjustReadIntermediateResultArrayCost(RangeTblEntry *rangeTableEntry, + RelOptInfo *relOptInfo); +static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo, + List *columnTypes, + int resultIdCount, + Datum *resultIds, + Const *resultFormatConst); static List * OuterPlanParamsList(PlannerInfo *root); static List * CopyPlanParamList(List *originalPlanParamList); static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void); @@ -1517,6 +1524,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, DistTableCacheEntry *cacheEntry = NULL; AdjustReadIntermediateResultCost(rte, relOptInfo); + AdjustReadIntermediateResultArrayCost(rte, relOptInfo); if (rte->rtekind != RTE_RELATION) { @@ -1578,30 +1586,6 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *relOptInfo) { - PathTarget *reltarget = relOptInfo->reltarget; - List *pathList = relOptInfo->pathlist; - Path *path = NULL; - RangeTblFunction *rangeTableFunction = NULL; - FuncExpr *funcExpression = NULL; - Const *resultFormatConst = NULL; - Datum resultFormatDatum = 0; - Oid resultFormatId = InvalidOid; - Const *resultIdConst = NULL; - Datum resultIdDatum = 0; - char *resultId = NULL; - int64 resultSize = 0; - ListCell *typeCell = NULL; - bool binaryFormat = false; - double rowCost = 0.; - double rowSizeEstimate = 0; - double rowCountEstimate = 0.; - double ioCost = 0.; -#if PG_VERSION_NUM >= 120000 - QualCost funcCost = { 0., 0. }; -#else - double funcCost = 0.; -#endif - if (rangeTableEntry->rtekind != RTE_FUNCTION || list_length(rangeTableEntry->functions) != 1) { @@ -1620,41 +1604,133 @@ AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *rel return; } - rangeTableFunction = (RangeTblFunction *) linitial(rangeTableEntry->functions); - funcExpression = (FuncExpr *) rangeTableFunction->funcexpr; - resultIdConst = (Const *) linitial(funcExpression->args); + RangeTblFunction *rangeTableFunction = (RangeTblFunction *) linitial( + rangeTableEntry->functions); + FuncExpr *funcExpression = (FuncExpr *) rangeTableFunction->funcexpr; + Const *resultIdConst = (Const *) linitial(funcExpression->args); if (!IsA(resultIdConst, Const)) { /* not sure how to interpret non-const */ return; } - resultIdDatum = resultIdConst->constvalue; - resultId = TextDatumGetCString(resultIdDatum); + Datum resultIdDatum = resultIdConst->constvalue; - resultSize = IntermediateResultSize(resultId); - if (resultSize < 0) - { - /* result does not exist, will probably error out later on */ - return; - } - - resultFormatConst = (Const *) lsecond(funcExpression->args); + Const *resultFormatConst = (Const *) lsecond(funcExpression->args); if (!IsA(resultFormatConst, Const)) { /* not sure how to interpret non-const */ return; } - resultFormatDatum = resultFormatConst->constvalue; - resultFormatId = DatumGetObjectId(resultFormatDatum); + AdjustReadIntermediateResultsCostInternal(relOptInfo, + rangeTableFunction->funccoltypes, + 1, &resultIdDatum, resultFormatConst); +} - if (resultFormatId == BinaryCopyFormatId()) + +/* + * AdjustReadIntermediateResultArrayCost adjusts the row count and total cost + * of a read_intermediate_results(resultIds, format) call based on the file size. + */ +static void +AdjustReadIntermediateResultArrayCost(RangeTblEntry *rangeTableEntry, + RelOptInfo *relOptInfo) +{ + Datum *resultIdArray = NULL; + int resultIdCount = 0; + + if (rangeTableEntry->rtekind != RTE_FUNCTION || + list_length(rangeTableEntry->functions) != 1) { - binaryFormat = true; + /* avoid more expensive checks below for non-functions */ + return; + } - /* subtract 11-byte signature + 8 byte header + 2-byte footer */ - resultSize -= 21; + if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG5)) + { + /* read_intermediate_result may not exist */ + return; + } + + if (!ContainsReadIntermediateResultArrayFunction((Node *) rangeTableEntry->functions)) + { + return; + } + + RangeTblFunction *rangeTableFunction = + (RangeTblFunction *) linitial(rangeTableEntry->functions); + FuncExpr *funcExpression = (FuncExpr *) rangeTableFunction->funcexpr; + Const *resultIdConst = (Const *) linitial(funcExpression->args); + if (!IsA(resultIdConst, Const)) + { + /* not sure how to interpret non-const */ + return; + } + + Datum resultIdArrayDatum = resultIdConst->constvalue; + deconstruct_array(DatumGetArrayTypeP(resultIdArrayDatum), TEXTOID, -1, false, + 'i', &resultIdArray, NULL, &resultIdCount); + + Const *resultFormatConst = (Const *) lsecond(funcExpression->args); + if (!IsA(resultFormatConst, Const)) + { + /* not sure how to interpret non-const */ + return; + } + + AdjustReadIntermediateResultsCostInternal(relOptInfo, + rangeTableFunction->funccoltypes, + resultIdCount, resultIdArray, + resultFormatConst); +} + + +/* + * AdjustReadIntermediateResultsCostInternal adjusts the row count and total cost + * of reading intermediate results based on file sizes. + */ +static void +AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo, List *columnTypes, + int resultIdCount, Datum *resultIds, + Const *resultFormatConst) +{ + PathTarget *reltarget = relOptInfo->reltarget; + List *pathList = relOptInfo->pathlist; + Path *path = NULL; + double rowCost = 0.; + double rowSizeEstimate = 0; + double rowCountEstimate = 0.; + double ioCost = 0.; +#if PG_VERSION_NUM >= 120000 + QualCost funcCost = { 0., 0. }; +#else + double funcCost = 0.; +#endif + int64 totalResultSize = 0; + ListCell *typeCell = NULL; + + Datum resultFormatDatum = resultFormatConst->constvalue; + Oid resultFormatId = DatumGetObjectId(resultFormatDatum); + bool binaryFormat = (resultFormatId == BinaryCopyFormatId()); + + for (int index = 0; index < resultIdCount; index++) + { + char *resultId = TextDatumGetCString(resultIds[index]); + int64 resultSize = IntermediateResultSize(resultId); + if (resultSize < 0) + { + /* result does not exist, will probably error out later on */ + return; + } + + if (binaryFormat) + { + /* subtract 11-byte signature + 8 byte header + 2-byte footer */ + totalResultSize -= 21; + } + + totalResultSize += resultSize; } /* start with the cost of evaluating quals */ @@ -1666,7 +1742,7 @@ AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *rel /* add 2 bytes for column count (binary) or line separator (text) */ rowSizeEstimate += 2; - foreach(typeCell, rangeTableFunction->funccoltypes) + foreach(typeCell, columnTypes) { Oid columnTypeId = lfirst_oid(typeCell); Oid inputFunctionId = InvalidOid; @@ -1702,10 +1778,10 @@ AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *rel #endif /* estimate the number of rows based on the file size and estimated row size */ - rowCountEstimate = Max(1, (double) resultSize / rowSizeEstimate); + rowCountEstimate = Max(1, (double) totalResultSize / rowSizeEstimate); /* cost of reading the data */ - ioCost = seq_page_cost * resultSize / BLCKSZ; + ioCost = seq_page_cost * totalResultSize / BLCKSZ; Assert(pathList != NIL); diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index e735a28ac..4aa53c2e1 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -78,6 +78,8 @@ static bool ErrorHintRequired(const char *errorHint, Query *queryTree); static bool HasTablesample(Query *queryTree); static bool HasComplexRangeTableType(Query *queryTree); static bool IsReadIntermediateResultFunction(Node *node); +static bool IsReadIntermediateResultArrayFunction(Node *node); +static bool IsFunctionWithOid(Node *node, Oid funcOid); static bool ExtractFromExpressionWalker(Node *node, QualifierWalkerContext *walkerContext); static List * MultiTableNodeList(List *tableEntryList, List *rangeTableList); @@ -768,18 +770,52 @@ ContainsReadIntermediateResultFunction(Node *node) } +/* + * ContainsReadIntermediateResultArrayFunction determines whether an expresion + * tree contains a call to the read_intermediate_results(result_ids, format) + * function. + */ +bool +ContainsReadIntermediateResultArrayFunction(Node *node) +{ + return FindNodeCheck(node, IsReadIntermediateResultArrayFunction); +} + + /* * IsReadIntermediateResultFunction determines whether a given node is a function call * to the read_intermediate_result function. */ static bool IsReadIntermediateResultFunction(Node *node) +{ + return IsFunctionWithOid(node, CitusReadIntermediateResultFuncId()); +} + + +/* + * IsReadIntermediateResultArrayFunction determines whether a given node is a + * function call to the read_intermediate_results(result_ids, format) function. + */ +static bool +IsReadIntermediateResultArrayFunction(Node *node) +{ + return IsFunctionWithOid(node, CitusReadIntermediateResultArrayFuncId()); +} + + +/* + * IsFunctionWithOid determines whether a given node is a function call + * to the read_intermediate_result function. + */ +static bool +IsFunctionWithOid(Node *node, Oid funcOid) { if (IsA(node, FuncExpr)) { FuncExpr *funcExpr = (FuncExpr *) node; - if (funcExpr->funcid == CitusReadIntermediateResultFuncId()) + if (funcExpr->funcid == funcOid) { return true; } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 25b04c022..e3d9a5085 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -188,6 +188,7 @@ extern Oid CitusCopyFormatTypeId(void); /* function oids */ extern Oid CitusReadIntermediateResultFuncId(void); +Oid CitusReadIntermediateResultArrayFuncId(void); extern Oid CitusExtraDataContainerFuncId(void); extern Oid CitusWorkerHashFunctionId(void); extern Oid CitusAnyValueFunctionId(void); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index eb0a56a58..22b45ecd8 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -193,6 +193,7 @@ extern bool FindNodeCheckInRangeTableList(List *rtable, bool (*check)(Node *)); extern bool IsDistributedTableRTE(Node *node); extern bool QueryContainsDistributedTableRTE(Query *query); extern bool ContainsReadIntermediateResultFunction(Node *node); +extern bool ContainsReadIntermediateResultArrayFunction(Node *node); extern char * FindIntermediateResultIdIfExists(RangeTblEntry *rte); extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode); diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out index 93dcc8e69..fd2995697 100644 --- a/src/test/regress/expected/intermediate_results.out +++ b/src/test/regress/expected/intermediate_results.out @@ -201,10 +201,10 @@ SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series 632 (1 row) -EXPLAIN (COSTS OFF) SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); - QUERY PLAN ------------------------------------------------ - Function Scan on read_intermediate_result res +EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); + QUERY PLAN +----------------------------------------------------------------------------------- + Function Scan on read_intermediate_result res (cost=0.00..4.55 rows=632 width=8) (1 row) -- less accurate results for variable types @@ -214,10 +214,10 @@ SELECT create_intermediate_result('hellos', $$SELECT s, 'hello-'||s FROM generat 63 (1 row) -EXPLAIN (COSTS OFF) SELECT * FROM read_intermediate_result('hellos', 'binary') AS res (x int, y text); - QUERY PLAN ------------------------------------------------ - Function Scan on read_intermediate_result res +EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_result('hellos', 'binary') AS res (x int, y text); + QUERY PLAN +----------------------------------------------------------------------------------- + Function Scan on read_intermediate_result res (cost=0.00..0.32 rows=30 width=36) (1 row) -- not very accurate results for text encoding @@ -227,10 +227,10 @@ SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_s 4 (1 row) -EXPLAIN (COSTS OFF) SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type); - QUERY PLAN ------------------------------------------------ - Function Scan on read_intermediate_result res +EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type); + QUERY PLAN +---------------------------------------------------------------------------------- + Function Scan on read_intermediate_result res (cost=0.00..0.01 rows=1 width=32) (1 row) END; @@ -368,6 +368,50 @@ ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2; jon | 5 | 5 | 25 | (2,3) (2 rows) +END; +-- Cost estimation for read_intermediate_results +BEGIN; +-- almost accurate row count estimates for primitive types +SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,632) s'), + create_intermediate_result('squares_2', 'SELECT s, s*s FROM generate_series(633,1024) s'); + create_intermediate_result | create_intermediate_result +----------------------------+---------------------------- + 632 | 392 +(1 row) + +EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2'], 'binary') AS res (x int, x2 int); + QUERY PLAN +------------------------------------------------------------------------------------- + Function Scan on read_intermediate_results res (cost=0.00..7.37 rows=1024 width=8) +(1 row) + +-- less accurate results for variable types +SELECT create_intermediate_result('hellos_1', $$SELECT s, 'hello-'||s FROM generate_series(1,63) s$$), + create_intermediate_result('hellos_2', $$SELECT s, 'hello-'||s FROM generate_series(64,129) s$$); + create_intermediate_result | create_intermediate_result +----------------------------+---------------------------- + 63 | 66 +(1 row) + +EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['hellos_1', 'hellos_2'], 'binary') AS res (x int, y text); + QUERY PLAN +------------------------------------------------------------------------------------ + Function Scan on read_intermediate_results res (cost=0.00..0.66 rows=62 width=36) +(1 row) + +-- not very accurate results for text encoding +SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares'); + create_intermediate_result +---------------------------- + 4 +(1 row) + +EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['stored_squares'], 'text') AS res (s intermediate_results.square_type); + QUERY PLAN +----------------------------------------------------------------------------------- + Function Scan on read_intermediate_results res (cost=0.00..0.01 rows=1 width=32) +(1 row) + END; DROP SCHEMA intermediate_results CASCADE; NOTICE: drop cascades to 5 other objects diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql index bed8c7901..cdb5797b9 100644 --- a/src/test/regress/sql/intermediate_results.sql +++ b/src/test/regress/sql/intermediate_results.sql @@ -121,15 +121,15 @@ END; BEGIN; -- accurate row count estimates for primitive types SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,632) s'); -EXPLAIN (COSTS OFF) SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); +EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); -- less accurate results for variable types SELECT create_intermediate_result('hellos', $$SELECT s, 'hello-'||s FROM generate_series(1,63) s$$); -EXPLAIN (COSTS OFF) SELECT * FROM read_intermediate_result('hellos', 'binary') AS res (x int, y text); +EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_result('hellos', 'binary') AS res (x int, y text); -- not very accurate results for text encoding SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares'); -EXPLAIN (COSTS OFF) SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type); +EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_result('stored_squares', 'text') AS res (s intermediate_results.square_type); END; -- pipe query output into a result file and create a table to check the result @@ -201,5 +201,21 @@ ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2; END; +-- Cost estimation for read_intermediate_results +BEGIN; +-- almost accurate row count estimates for primitive types +SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,632) s'), + create_intermediate_result('squares_2', 'SELECT s, s*s FROM generate_series(633,1024) s'); +EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2'], 'binary') AS res (x int, x2 int); + +-- less accurate results for variable types +SELECT create_intermediate_result('hellos_1', $$SELECT s, 'hello-'||s FROM generate_series(1,63) s$$), + create_intermediate_result('hellos_2', $$SELECT s, 'hello-'||s FROM generate_series(64,129) s$$); +EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['hellos_1', 'hellos_2'], 'binary') AS res (x int, y text); + +-- not very accurate results for text encoding +SELECT create_intermediate_result('stored_squares', 'SELECT square FROM stored_squares'); +EXPLAIN (COSTS ON) SELECT * FROM read_intermediate_results(ARRAY['stored_squares'], 'text') AS res (s intermediate_results.square_type); +END; DROP SCHEMA intermediate_results CASCADE;