mirror of https://github.com/citusdata/citus.git
Treat read_intermediate_result as recurring tuples
parent
716448ddef
commit
7279d42849
|
@ -67,7 +67,8 @@ typedef enum RecurringTuplesType
|
|||
RECURRING_TUPLES_INVALID = 0,
|
||||
RECURRING_TUPLES_REFERENCE_TABLE,
|
||||
RECURRING_TUPLES_FUNCTION,
|
||||
RECURRING_TUPLES_EMPTY_JOIN_TREE
|
||||
RECURRING_TUPLES_EMPTY_JOIN_TREE,
|
||||
RECURRING_TUPLES_RESULT_FUNCTION
|
||||
} RecurringTuplesType;
|
||||
|
||||
|
||||
|
@ -129,6 +130,8 @@ static bool RelationInfoContainsRecurringTuples(PlannerInfo *plannerInfo,
|
|||
RelOptInfo *relationInfo,
|
||||
RecurringTuplesType *recurType);
|
||||
static bool HasRecurringTuples(Node *node, RecurringTuplesType *recurType);
|
||||
static bool ContainsReadIntermediateResultFunction(Node *node);
|
||||
static bool IsReadIntermediateResultFunction(Node *node);
|
||||
static void ValidateClauseList(List *clauseList);
|
||||
static bool ExtractFromExpressionWalker(Node *node,
|
||||
QualifierWalkerContext *walkerContext);
|
||||
|
@ -952,6 +955,14 @@ DeferErrorIfUnsupportedSublinkAndReferenceTable(Query *queryTree)
|
|||
"clause when the query has subqueries in "
|
||||
"WHERE clause", NULL);
|
||||
}
|
||||
else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"cannot pushdown the subquery",
|
||||
"Complex subqueries and CTEs are not allowed in "
|
||||
"the FROM clause when the query has subqueries in the "
|
||||
"WHERE clause", NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
|
@ -1218,6 +1229,13 @@ DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree,
|
|||
"Subqueries without a FROM clause are not supported with "
|
||||
"union operator", NULL);
|
||||
}
|
||||
else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"cannot push down this subquery",
|
||||
"Complex subqueries and CTEs are not supported within a "
|
||||
"UNION", NULL);
|
||||
}
|
||||
|
||||
|
||||
return NULL;
|
||||
|
@ -1297,7 +1315,18 @@ DeferErrorIfUnsupportedTableCombination(Query *queryTree)
|
|||
}
|
||||
else if (rangeTableEntry->rtekind == RTE_FUNCTION)
|
||||
{
|
||||
if (contain_mutable_functions((Node *) rangeTableEntry->functions))
|
||||
List *functionList = rangeTableEntry->functions;
|
||||
|
||||
if (list_length(functionList) == 1 &&
|
||||
ContainsReadIntermediateResultFunction(linitial(functionList)))
|
||||
{
|
||||
/*
|
||||
* The read_intermediate_result function is volatile, but we know
|
||||
* it has the same result across all nodes and can therefore treat
|
||||
* it as a reference table.
|
||||
*/
|
||||
}
|
||||
else if (contain_mutable_functions((Node *) functionList))
|
||||
{
|
||||
unsupportedTableCombination = true;
|
||||
errorDetail = "Only immutable functions can be used as a table "
|
||||
|
@ -2003,7 +2032,13 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin(
|
|||
"There exist a subquery without FROM in the outer "
|
||||
"part of the outer join", NULL);
|
||||
}
|
||||
|
||||
else if (recurType == RECURRING_TUPLES_RESULT_FUNCTION)
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"cannot pushdown the subquery",
|
||||
"Complex subqueries and CTEs cannot be in the outer "
|
||||
"part of the outer join", NULL);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -2141,7 +2176,17 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType)
|
|||
}
|
||||
else if (rangeTableEntry->rtekind == RTE_FUNCTION)
|
||||
{
|
||||
*recurType = RECURRING_TUPLES_FUNCTION;
|
||||
List *functionList = rangeTableEntry->functions;
|
||||
|
||||
if (list_length(functionList) == 1 &&
|
||||
ContainsReadIntermediateResultFunction((Node *) functionList))
|
||||
{
|
||||
*recurType = RECURRING_TUPLES_RESULT_FUNCTION;
|
||||
}
|
||||
else
|
||||
{
|
||||
*recurType = RECURRING_TUPLES_FUNCTION;
|
||||
}
|
||||
|
||||
/*
|
||||
* Tuples from functions will recur in every query on shards that includes
|
||||
|
@ -2176,6 +2221,38 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ContainsReadIntermediateResultFunction determines whether an expresion tree contains
|
||||
* a call to the read_intermediate_results function.
|
||||
*/
|
||||
static bool
|
||||
ContainsReadIntermediateResultFunction(Node *node)
|
||||
{
|
||||
return FindNodeCheck(node, IsReadIntermediateResultFunction);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsReadIntermediateResultFunction determines whether a given node is a function call
|
||||
* to the read_intermediate_result function.
|
||||
*/
|
||||
static bool
|
||||
IsReadIntermediateResultFunction(Node *node)
|
||||
{
|
||||
if (IsA(node, FuncExpr))
|
||||
{
|
||||
FuncExpr *funcExpr = (FuncExpr *) node;
|
||||
|
||||
if (funcExpr->funcid == CitusReadIntermediateResultFuncId())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfQueryNotSupported checks that we can perform distributed planning for
|
||||
* the given query. The checks in this function will be removed as we support
|
||||
|
|
|
@ -117,6 +117,7 @@ typedef struct MetadataCacheData
|
|||
Oid distTransactionRelationId;
|
||||
Oid distTransactionGroupIndexId;
|
||||
Oid distTransactionRecordIndexId;
|
||||
Oid readIntermediateResultFuncId;
|
||||
Oid extraDataContainerFuncId;
|
||||
Oid workerHashFunctionId;
|
||||
Oid extensionOwner;
|
||||
|
@ -1834,6 +1835,31 @@ DistPlacementGroupidIndexId(void)
|
|||
}
|
||||
|
||||
|
||||
/* return oid of the read_intermediate_result(text,citus.copy_format) function */
|
||||
Oid
|
||||
CitusReadIntermediateResultFuncId(void)
|
||||
{
|
||||
if (MetadataCache.readIntermediateResultFuncId == InvalidOid)
|
||||
{
|
||||
bool missingOK = false;
|
||||
|
||||
List *copyFormatTypeNameList = list_make2(makeString("citus"),
|
||||
makeString("copy_format"));
|
||||
TypeName *copyFormatTypeName = makeTypeNameFromNameList(copyFormatTypeNameList);
|
||||
Oid copyFormatTypeOid = LookupTypeNameOid(NULL, copyFormatTypeName, missingOK);
|
||||
|
||||
List *functionNameList = list_make2(makeString("pg_catalog"),
|
||||
makeString("read_intermediate_result"));
|
||||
Oid paramOids[2] = { TEXTOID, copyFormatTypeOid };
|
||||
|
||||
MetadataCache.readIntermediateResultFuncId =
|
||||
LookupFuncName(functionNameList, 2, paramOids, missingOK);
|
||||
}
|
||||
|
||||
return MetadataCache.readIntermediateResultFuncId;
|
||||
}
|
||||
|
||||
|
||||
/* return oid of the citus_extradata_container(internal) function */
|
||||
Oid
|
||||
CitusExtraDataContainerFuncId(void)
|
||||
|
|
|
@ -124,6 +124,7 @@ extern Oid DistTransactionRecordIndexId(void);
|
|||
extern Oid DistPlacementGroupidIndexId(void);
|
||||
|
||||
/* function oids */
|
||||
extern Oid CitusReadIntermediateResultFuncId(void);
|
||||
extern Oid CitusExtraDataContainerFuncId(void);
|
||||
extern Oid CitusWorkerHashFunctionId(void);
|
||||
|
||||
|
|
Loading…
Reference in New Issue