From 7279d428496baf3c11e77f782ca1b475a822c749 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 30 Nov 2017 13:21:38 +0100 Subject: [PATCH] Treat read_intermediate_result as recurring tuples --- .../planner/multi_logical_planner.c | 85 ++++++++++++++++++- .../distributed/utils/metadata_cache.c | 26 ++++++ src/include/distributed/metadata_cache.h | 1 + 3 files changed, 108 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index fc4891d67..ae38ded87 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -67,7 +67,8 @@ typedef enum RecurringTuplesType RECURRING_TUPLES_INVALID = 0, RECURRING_TUPLES_REFERENCE_TABLE, RECURRING_TUPLES_FUNCTION, - RECURRING_TUPLES_EMPTY_JOIN_TREE + RECURRING_TUPLES_EMPTY_JOIN_TREE, + RECURRING_TUPLES_RESULT_FUNCTION } RecurringTuplesType; @@ -129,6 +130,8 @@ static bool RelationInfoContainsRecurringTuples(PlannerInfo *plannerInfo, RelOptInfo *relationInfo, RecurringTuplesType *recurType); static bool HasRecurringTuples(Node *node, RecurringTuplesType *recurType); +static bool ContainsReadIntermediateResultFunction(Node *node); +static bool IsReadIntermediateResultFunction(Node *node); static void ValidateClauseList(List *clauseList); static bool ExtractFromExpressionWalker(Node *node, QualifierWalkerContext *walkerContext); @@ -952,6 +955,14 @@ DeferErrorIfUnsupportedSublinkAndReferenceTable(Query *queryTree) "clause when the query has subqueries in " "WHERE clause", NULL); } + else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot pushdown the subquery", + "Complex subqueries and CTEs are not allowed in " + "the FROM clause when the query has subqueries in the " + "WHERE clause", NULL); + } else { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, @@ -1218,6 +1229,13 @@ DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree, "Subqueries without a FROM clause are not supported with " "union operator", NULL); } + else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot push down this subquery", + "Complex subqueries and CTEs are not supported within a " + "UNION", NULL); + } return NULL; @@ -1297,7 +1315,18 @@ DeferErrorIfUnsupportedTableCombination(Query *queryTree) } else if (rangeTableEntry->rtekind == RTE_FUNCTION) { - if (contain_mutable_functions((Node *) rangeTableEntry->functions)) + List *functionList = rangeTableEntry->functions; + + if (list_length(functionList) == 1 && + ContainsReadIntermediateResultFunction(linitial(functionList))) + { + /* + * The read_intermediate_result function is volatile, but we know + * it has the same result across all nodes and can therefore treat + * it as a reference table. + */ + } + else if (contain_mutable_functions((Node *) functionList)) { unsupportedTableCombination = true; errorDetail = "Only immutable functions can be used as a table " @@ -2003,7 +2032,13 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin( "There exist a subquery without FROM in the outer " "part of the outer join", NULL); } - + else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot pushdown the subquery", + "Complex subqueries and CTEs cannot be in the outer " + "part of the outer join", NULL); + } return NULL; } @@ -2141,7 +2176,17 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType) } else if (rangeTableEntry->rtekind == RTE_FUNCTION) { - *recurType = RECURRING_TUPLES_FUNCTION; + List *functionList = rangeTableEntry->functions; + + if (list_length(functionList) == 1 && + ContainsReadIntermediateResultFunction((Node *) functionList)) + { + *recurType = RECURRING_TUPLES_RESULT_FUNCTION; + } + else + { + *recurType = RECURRING_TUPLES_FUNCTION; + } /* * Tuples from functions will recur in every query on shards that includes @@ -2176,6 +2221,38 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType) } +/* + * ContainsReadIntermediateResultFunction determines whether an expresion tree contains + * a call to the read_intermediate_results function. + */ +static bool +ContainsReadIntermediateResultFunction(Node *node) +{ + return FindNodeCheck(node, IsReadIntermediateResultFunction); +} + + +/* + * IsReadIntermediateResultFunction determines whether a given node is a function call + * to the read_intermediate_result function. + */ +static bool +IsReadIntermediateResultFunction(Node *node) +{ + if (IsA(node, FuncExpr)) + { + FuncExpr *funcExpr = (FuncExpr *) node; + + if (funcExpr->funcid == CitusReadIntermediateResultFuncId()) + { + return true; + } + } + + return false; +} + + /* * ErrorIfQueryNotSupported checks that we can perform distributed planning for * the given query. The checks in this function will be removed as we support diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 7f5420d41..c7a1b4e37 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -117,6 +117,7 @@ typedef struct MetadataCacheData Oid distTransactionRelationId; Oid distTransactionGroupIndexId; Oid distTransactionRecordIndexId; + Oid readIntermediateResultFuncId; Oid extraDataContainerFuncId; Oid workerHashFunctionId; Oid extensionOwner; @@ -1834,6 +1835,31 @@ DistPlacementGroupidIndexId(void) } +/* return oid of the read_intermediate_result(text,citus.copy_format) function */ +Oid +CitusReadIntermediateResultFuncId(void) +{ + if (MetadataCache.readIntermediateResultFuncId == InvalidOid) + { + bool missingOK = false; + + List *copyFormatTypeNameList = list_make2(makeString("citus"), + makeString("copy_format")); + TypeName *copyFormatTypeName = makeTypeNameFromNameList(copyFormatTypeNameList); + Oid copyFormatTypeOid = LookupTypeNameOid(NULL, copyFormatTypeName, missingOK); + + List *functionNameList = list_make2(makeString("pg_catalog"), + makeString("read_intermediate_result")); + Oid paramOids[2] = { TEXTOID, copyFormatTypeOid }; + + MetadataCache.readIntermediateResultFuncId = + LookupFuncName(functionNameList, 2, paramOids, missingOK); + } + + return MetadataCache.readIntermediateResultFuncId; +} + + /* return oid of the citus_extradata_container(internal) function */ Oid CitusExtraDataContainerFuncId(void) diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 6e0bfa720..027e3cb7e 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -124,6 +124,7 @@ extern Oid DistTransactionRecordIndexId(void); extern Oid DistPlacementGroupidIndexId(void); /* function oids */ +extern Oid CitusReadIntermediateResultFuncId(void); extern Oid CitusExtraDataContainerFuncId(void); extern Oid CitusWorkerHashFunctionId(void);