From 40e0ec6ee5baaeb81430f74954f16a644c1a2066 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 18 Apr 2017 17:49:41 +0300 Subject: [PATCH] Change behaviour of subquery pushdown flag (#1315) This commit changes the behaviour of the citus.subquery_pushdown flag. Before this commit, the flag is used to enable subquery pushdown logic. But, with this commit, that behaviour is enabled by default. In other words, the flag is now useless. We prefer to keep the flag since we don't want to break the backward compatibility. Also, we may consider using that flag for other purposes in the next commits. --- .../planner/multi_logical_optimizer.c | 863 +--------------- .../planner/multi_logical_planner.c | 948 +++++++++++++++++- .../planner/multi_physical_planner.c | 173 ++++ .../distributed/planner/multi_planner.c | 7 +- .../relation_restriction_equivalence.c | 26 + .../distributed/multi_logical_optimizer.h | 4 +- .../distributed/multi_logical_planner.h | 5 +- .../relation_restriction_equivalence.h | 1 + .../expected/multi_complex_expressions.out | 12 +- .../expected/multi_mx_router_planner.out | 4 +- .../regress/expected/multi_router_planner.out | 4 +- .../multi_subquery_behavioral_analytics.out | 2 - .../multi_subquery_complex_queries.out | 6 +- .../regress/expected/multi_subquery_union.out | 2 - src/test/regress/expected/multi_view.out | 29 +- .../input/multi_complex_count_distinct.source | 24 +- src/test/regress/input/multi_subquery.source | 12 +- .../multi_complex_count_distinct.source | 24 +- src/test/regress/output/multi_subquery.source | 21 +- .../regress/output/multi_subquery_0.source | 21 +- .../regress/sql/multi_complex_expressions.sql | 6 +- .../multi_subquery_behavioral_analytics.sql | 2 - .../sql/multi_subquery_complex_queries.sql | 2 - src/test/regress/sql/multi_subquery_union.sql | 2 - src/test/regress/sql/multi_view.sql | 14 +- 25 files changed, 1204 insertions(+), 1010 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 184a76481..20945f439 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -48,7 +48,6 @@ #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/rel.h" -#include "utils/relcache.h" #include "utils/syscache.h" #include "utils/tqual.h" @@ -148,28 +147,6 @@ static bool TablePartitioningSupportsDistinct(List *tableNodeList, Var *distinctColumn); static bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column); -/* Local functions forward declarations for subquery pushdown checks */ -static void ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode, - PlannerRestrictionContext * - plannerRestrictionContext, - Query *originalQuery); -static void ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit); -static void ErrorIfUnsupportedSetOperation(Query *subqueryTree, bool outerQueryHasLimit); -static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationList); -static void ErrorIfUnsupportedTableCombination(Query *queryTree); -static bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); -static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query); -static bool FullCompositeFieldList(List *compositeFieldList); -static void ErrorIfUnsupportedShardDistribution(Query *query); -static List * RelationIdList(Query *query); -static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); -static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, - ShardInterval *firstInterval, - ShardInterval *secondInterval); -static void ErrorIfUnsupportedFilters(Query *subquery); -static bool EqualOpExpressionLists(List *firstOpExpressionList, - List *secondOpExpressionList); - /* Local functions forward declarations for limit clauses */ static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode); static List * WorkerSortClauseList(MultiExtendedOp *originalOpNode); @@ -192,16 +169,9 @@ static bool HasOrderByHllType(List *sortClauseList, List *targetList); * Third, the function pulls up the collect operators in the tree. Fourth, the * function finds the extended operator node, and splits this node into master * and worker extended operator nodes. - * - * We also pass plannerRestrictionContext and originalQuery to the optimizer. - * These are primarily used to decide whether the subquery is safe to pushdown. - * If not, it helps to produce meaningful error messages for subquery - * pushdown planning. */ void -MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan, - PlannerRestrictionContext *plannerRestrictionContext, - Query *originalQuery) +MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) { bool hasOrderByHllType = false; List *selectNodeList = NIL; @@ -220,10 +190,6 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan, /* check that we can optimize aggregates in the plan */ ErrorIfContainsUnsupportedAggregate(logicalPlanNode); - /* check that we can pushdown subquery in the plan */ - ErrorIfContainsUnsupportedSubquery(logicalPlanNode, plannerRestrictionContext, - originalQuery); - /* * If a select node exists, we use the idempower property to split the node * into two nodes that contain And and Or clauses. If both And and Or nodes @@ -2809,81 +2775,6 @@ GroupedByColumn(List *groupClauseList, List *targetList, Var *column) } -/* - * ErrorIfContainsUnsupportedSubquery extracts subquery multi table from the - * logical plan using plannerRestrictionContext and the original query. It uses - * some helper functions to check if we can push down subquery to worker nodes. - * These helper functions error out if we cannot push down the subquery. - */ -static void -ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode, - PlannerRestrictionContext *plannerRestrictionContext, - Query *originalQuery) -{ - Query *subquery = NULL; - List *extendedOpNodeList = NIL; - MultiTable *multiTable = NULL; - MultiExtendedOp *extendedOpNode = NULL; - bool outerQueryHasLimit = false; - RelationRestrictionContext *relationRestrictionContext = - plannerRestrictionContext->relationRestrictionContext; - - /* check if logical plan includes a subquery */ - List *subqueryMultiTableList = SubqueryMultiTableList(logicalPlanNode); - if (subqueryMultiTableList == NIL) - { - return; - } - - /* currently in the planner we only allow one subquery in from-clause*/ - Assert(list_length(subqueryMultiTableList) == 1); - - /* - * We're checking two things here: - * (i) If the query contains a top level union, ensure that all leaves - * return the partition key at the same position - * (ii) Else, check whether all relations joined on the partition key or not - */ - if (ContainsUnionSubquery(originalQuery)) - { - if (!SafeToPushdownUnionSubquery(relationRestrictionContext)) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot pushdown the subquery since all leaves of " - "the UNION does not include partition key at the " - "same position"), - errdetail("Each leaf query of the UNION should return " - "partition key at the same position on its " - "target list."))); - } - } - else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext)) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot pushdown the subquery since all relations are not " - "joined using distribution keys"), - errdetail("Each relation should be joined with at least " - "one another relation using distribution keys and " - "equality operator."))); - } - - multiTable = (MultiTable *) linitial(subqueryMultiTableList); - subquery = multiTable->subquery; - - extendedOpNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp); - extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList); - - if (extendedOpNode->limitCount) - { - outerQueryHasLimit = true; - } - - ErrorIfCannotPushdownSubquery(subquery, outerQueryHasLimit); - ErrorIfUnsupportedShardDistribution(subquery); - ErrorIfUnsupportedFilters(subquery); -} - - /* * SubqueryMultiTableList extracts multi tables in the given logical plan tree * and returns subquery multi tables in a new list. @@ -2910,290 +2801,6 @@ SubqueryMultiTableList(MultiNode *multiNode) } -/* - * ErrorIfCannotPushdownSubquery recursively checks if we can push down the given - * subquery to worker nodes. If we cannot push down the subquery, this function - * errors out. - * - * We can push down a subquery if it follows rules below. We support nested queries - * as long as they follow the same rules, and we recurse to validate each subquery - * for this given query. - * a. If there is an aggregate, it must be grouped on partition column. - * b. If there is a join, it must be between two regular tables or two subqueries. - * We don't support join between a regular table and a subquery. And columns on - * the join condition must be partition columns. - * c. If there is a distinct clause, it must be on the partition column. - * - * This function is very similar to ErrorIfQueryNotSupported() in logical - * planner, but we don't reuse it, because differently for subqueries we support - * a subset of distinct, union and left joins. - * - * Note that this list of checks is not exhaustive, there can be some cases - * which we let subquery to run but returned results would be wrong. Such as if - * a subquery has a group by on another subquery which includes order by with - * limit, we let this query to run, but results could be wrong depending on the - * features of underlying tables. - */ -static void -ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit) -{ - bool preconditionsSatisfied = true; - char *errorDetail = NULL; - List *subqueryEntryList = NIL; - ListCell *rangeTableEntryCell = NULL; - - ErrorIfUnsupportedTableCombination(subqueryTree); - - if (subqueryTree->hasSubLinks) - { - preconditionsSatisfied = false; - errorDetail = "Subqueries other than from-clause subqueries are unsupported"; - } - - if (subqueryTree->rtable == NIL) - { - preconditionsSatisfied = false; - errorDetail = "Subqueries without relations are unsupported"; - } - - if (subqueryTree->hasWindowFuncs) - { - preconditionsSatisfied = false; - errorDetail = "Window functions are currently unsupported"; - } - - if (subqueryTree->limitOffset) - { - preconditionsSatisfied = false; - errorDetail = "Offset clause is currently unsupported"; - } - - if (subqueryTree->limitCount && !outerQueryHasLimit) - { - preconditionsSatisfied = false; - errorDetail = "Limit in subquery without limit in the outer query is unsupported"; - } - - if (subqueryTree->setOperations) - { - ErrorIfUnsupportedSetOperation(subqueryTree, outerQueryHasLimit); - } - - if (subqueryTree->hasRecursive) - { - preconditionsSatisfied = false; - errorDetail = "Recursive queries are currently unsupported"; - } - - if (subqueryTree->cteList) - { - preconditionsSatisfied = false; - errorDetail = "Common Table Expressions are currently unsupported"; - } - - if (subqueryTree->hasForUpdate) - { - preconditionsSatisfied = false; - errorDetail = "For Update/Share commands are currently unsupported"; - } - - /* group clause list must include partition column */ - if (subqueryTree->groupClause) - { - List *groupClauseList = subqueryTree->groupClause; - List *targetEntryList = subqueryTree->targetList; - List *groupTargetEntryList = GroupTargetEntryList(groupClauseList, - targetEntryList); - bool groupOnPartitionColumn = TargetListOnPartitionColumn(subqueryTree, - groupTargetEntryList); - if (!groupOnPartitionColumn) - { - preconditionsSatisfied = false; - errorDetail = "Group by list without partition column is currently " - "unsupported"; - } - } - - /* we don't support aggregates without group by */ - if (subqueryTree->hasAggs && (subqueryTree->groupClause == NULL)) - { - preconditionsSatisfied = false; - errorDetail = "Aggregates without group by are currently unsupported"; - } - - /* having clause without group by on partition column is not supported */ - if (subqueryTree->havingQual && (subqueryTree->groupClause == NULL)) - { - preconditionsSatisfied = false; - errorDetail = "Having qual without group by on partition column is " - "currently unsupported"; - } - - /* distinct clause list must include partition column */ - if (subqueryTree->distinctClause) - { - List *distinctClauseList = subqueryTree->distinctClause; - List *targetEntryList = subqueryTree->targetList; - List *distinctTargetEntryList = GroupTargetEntryList(distinctClauseList, - targetEntryList); - bool distinctOnPartitionColumn = - TargetListOnPartitionColumn(subqueryTree, distinctTargetEntryList); - if (!distinctOnPartitionColumn) - { - preconditionsSatisfied = false; - errorDetail = "Distinct on columns without partition column is " - "currently unsupported"; - } - } - - /* finally check and error out if not satisfied */ - if (!preconditionsSatisfied) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("%s", errorDetail))); - } - - /* recursively do same check for subqueries of this query */ - subqueryEntryList = SubqueryEntryList(subqueryTree); - foreach(rangeTableEntryCell, subqueryEntryList) - { - RangeTblEntry *rangeTableEntry = - (RangeTblEntry *) lfirst(rangeTableEntryCell); - - Query *innerSubquery = rangeTableEntry->subquery; - ErrorIfCannotPushdownSubquery(innerSubquery, outerQueryHasLimit); - } -} - - -/* - * ErrorIfUnsupportedSetOperation is a helper function for ErrorIfCannotPushdownSubquery(). - * It basically iterates over the subqueries that reside under the given set operations. - * - * The function also errors out for set operations INTERSECT and EXCEPT. - */ -static void -ErrorIfUnsupportedSetOperation(Query *subqueryTree, bool outerQueryHasLimit) -{ - List *rangeTableList = subqueryTree->rtable; - List *rangeTableIndexList = NIL; - ListCell *rangeTableIndexCell = NULL; - List *setOperationStatementList = NIL; - ListCell *setOperationStatmentCell = NULL; - - ExtractSetOperationStatmentWalker((Node *) subqueryTree->setOperations, - &setOperationStatementList); - foreach(setOperationStatmentCell, setOperationStatementList) - { - SetOperationStmt *setOperation = - (SetOperationStmt *) lfirst(setOperationStatmentCell); - - if (setOperation->op != SETOP_UNION) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("Intersect and Except are currently unsupported"))); - } - } - - ExtractRangeTableIndexWalker((Node *) subqueryTree->setOperations, - &rangeTableIndexList); - foreach(rangeTableIndexCell, rangeTableIndexList) - { - int rangeTableIndex = lfirst_int(rangeTableIndexCell); - RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableIndex, rangeTableList); - - Assert(rangeTableEntry->rtekind == RTE_SUBQUERY); - - ErrorIfCannotPushdownSubquery(rangeTableEntry->subquery, outerQueryHasLimit); - } -} - - -/* - * ExtractSetOperationStatementWalker walks over a set operations statment, - * and finds all set operations in the tree. - */ -static bool -ExtractSetOperationStatmentWalker(Node *node, List **setOperationList) -{ - bool walkerResult = false; - if (node == NULL) - { - return false; - } - - if (IsA(node, SetOperationStmt)) - { - SetOperationStmt *setOperation = (SetOperationStmt *) node; - - (*setOperationList) = lappend(*setOperationList, setOperation); - } - - walkerResult = expression_tree_walker(node, ExtractSetOperationStatmentWalker, - setOperationList); - - return walkerResult; -} - - -/* - * ErrorIfUnsupportedTableCombination checks if the given query tree contains any - * unsupported range table combinations. For this, the function walks over all - * range tables in the join tree, and checks if they correspond to simple relations - * or subqueries. - */ -static void -ErrorIfUnsupportedTableCombination(Query *queryTree) -{ - List *rangeTableList = queryTree->rtable; - List *joinTreeTableIndexList = NIL; - ListCell *joinTreeTableIndexCell = NULL; - bool unsupporteTableCombination = false; - char *errorDetail = NULL; - - /* - * Extract all range table indexes from the join tree. Note that sub-queries - * that get pulled up by PostgreSQL don't appear in this join tree. - */ - ExtractRangeTableIndexWalker((Node *) queryTree->jointree, &joinTreeTableIndexList); - foreach(joinTreeTableIndexCell, joinTreeTableIndexList) - { - /* - * Join tree's range table index starts from 1 in the query tree. But, - * list indexes start from 0. - */ - int joinTreeTableIndex = lfirst_int(joinTreeTableIndexCell); - int rangeTableListIndex = joinTreeTableIndex - 1; - - RangeTblEntry *rangeTableEntry = - (RangeTblEntry *) list_nth(rangeTableList, rangeTableListIndex); - - /* - * Check if the range table in the join tree is a simple relation or a - * subquery. - */ - if (rangeTableEntry->rtekind != RTE_RELATION && - rangeTableEntry->rtekind != RTE_SUBQUERY) - { - unsupporteTableCombination = true; - errorDetail = "Table expressions other than simple relations and " - "subqueries are currently unsupported"; - break; - } - } - - /* finally check and error out if not satisfied */ - if (unsupporteTableCombination) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("%s", errorDetail))); - } -} - - /* * GroupTargetEntryList walks over group clauses in the given list, finds * matching target entries and return them in a new list. @@ -3216,53 +2823,6 @@ GroupTargetEntryList(List *groupClauseList, List *targetEntryList) } -/* - * TargetListOnPartitionColumn checks if at least one target list entry is on - * partition column. - */ -static bool -TargetListOnPartitionColumn(Query *query, List *targetEntryList) -{ - bool targetListOnPartitionColumn = false; - List *compositeFieldList = NIL; - - ListCell *targetEntryCell = NULL; - foreach(targetEntryCell, targetEntryList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - Expr *targetExpression = targetEntry->expr; - - bool isPartitionColumn = IsPartitionColumn(targetExpression, query); - if (isPartitionColumn) - { - FieldSelect *compositeField = CompositeFieldRecursive(targetExpression, - query); - if (compositeField) - { - compositeFieldList = lappend(compositeFieldList, compositeField); - } - else - { - targetListOnPartitionColumn = true; - break; - } - } - } - - /* check composite fields */ - if (!targetListOnPartitionColumn) - { - bool fullCompositeFieldList = FullCompositeFieldList(compositeFieldList); - if (fullCompositeFieldList) - { - targetListOnPartitionColumn = true; - } - } - - return targetListOnPartitionColumn; -} - - /* * IsPartitionColumn returns true if the given column is a partition column. * The function uses FindReferencedTableColumn to find the original relation @@ -3416,265 +2976,9 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * } -/* - * CompositeFieldRecursive recursively finds composite field in the query tree - * referred by given expression. If expression does not refer to a composite - * field, then it returns NULL. - * - * If expression is a field select we directly return composite field. If it is - * a column is referenced from a subquery, then we recursively check that subquery - * until we reach the source of that column, and find composite field. If this - * column is referenced from join range table entry, then we resolve which join - * column it refers and recursively use this column with the same query. - */ -static FieldSelect * -CompositeFieldRecursive(Expr *expression, Query *query) -{ - FieldSelect *compositeField = NULL; - List *rangetableList = query->rtable; - Index rangeTableEntryIndex = 0; - RangeTblEntry *rangeTableEntry = NULL; - Var *candidateColumn = NULL; - - if (IsA(expression, FieldSelect)) - { - compositeField = (FieldSelect *) expression; - return compositeField; - } - - if (IsA(expression, Var)) - { - candidateColumn = (Var *) expression; - } - else - { - return NULL; - } - - rangeTableEntryIndex = candidateColumn->varno - 1; - rangeTableEntry = list_nth(rangetableList, rangeTableEntryIndex); - - if (rangeTableEntry->rtekind == RTE_SUBQUERY) - { - Query *subquery = rangeTableEntry->subquery; - List *targetEntryList = subquery->targetList; - AttrNumber targetEntryIndex = candidateColumn->varattno - 1; - TargetEntry *subqueryTargetEntry = list_nth(targetEntryList, targetEntryIndex); - - Expr *subqueryExpression = subqueryTargetEntry->expr; - compositeField = CompositeFieldRecursive(subqueryExpression, subquery); - } - else if (rangeTableEntry->rtekind == RTE_JOIN) - { - List *joinColumnList = rangeTableEntry->joinaliasvars; - AttrNumber joinColumnIndex = candidateColumn->varattno - 1; - Expr *joinColumn = list_nth(joinColumnList, joinColumnIndex); - - compositeField = CompositeFieldRecursive(joinColumn, query); - } - - return compositeField; -} - - -/* - * FullCompositeFieldList gets a composite field list, and checks if all fields - * of composite type are used in the list. - */ -static bool -FullCompositeFieldList(List *compositeFieldList) -{ - bool fullCompositeFieldList = true; - bool *compositeFieldArray = NULL; - uint32 compositeFieldCount = 0; - uint32 fieldIndex = 0; - - ListCell *fieldSelectCell = NULL; - foreach(fieldSelectCell, compositeFieldList) - { - FieldSelect *fieldSelect = (FieldSelect *) lfirst(fieldSelectCell); - uint32 compositeFieldIndex = 0; - - Expr *fieldExpression = fieldSelect->arg; - if (!IsA(fieldExpression, Var)) - { - continue; - } - - if (compositeFieldArray == NULL) - { - uint32 index = 0; - Var *compositeColumn = (Var *) fieldExpression; - Oid compositeTypeId = compositeColumn->vartype; - Oid compositeRelationId = get_typ_typrelid(compositeTypeId); - - /* get composite type attribute count */ - Relation relation = relation_open(compositeRelationId, AccessShareLock); - compositeFieldCount = relation->rd_att->natts; - compositeFieldArray = palloc0(compositeFieldCount * sizeof(bool)); - relation_close(relation, AccessShareLock); - - for (index = 0; index < compositeFieldCount; index++) - { - compositeFieldArray[index] = false; - } - } - - compositeFieldIndex = fieldSelect->fieldnum - 1; - compositeFieldArray[compositeFieldIndex] = true; - } - - for (fieldIndex = 0; fieldIndex < compositeFieldCount; fieldIndex++) - { - if (!compositeFieldArray[fieldIndex]) - { - fullCompositeFieldList = false; - } - } - - if (compositeFieldCount == 0) - { - fullCompositeFieldList = false; - } - - return fullCompositeFieldList; -} - - -/* - * ErrorIfUnsupportedShardDistribution gets list of relations in the given query - * and checks if two conditions below hold for them, otherwise it errors out. - * a. Every relation is distributed by range or hash. This means shards are - * disjoint based on the partition column. - * b. All relations have 1-to-1 shard partitioning between them. This means - * shard count for every relation is same and for every shard in a relation - * there is exactly one shard in other relations with same min/max values. - */ -static void -ErrorIfUnsupportedShardDistribution(Query *query) -{ - Oid firstTableRelationId = InvalidOid; - List *relationIdList = RelationIdList(query); - ListCell *relationIdCell = NULL; - uint32 relationIndex = 0; - uint32 rangeDistributedRelationCount = 0; - uint32 hashDistributedRelationCount = 0; - - foreach(relationIdCell, relationIdList) - { - Oid relationId = lfirst_oid(relationIdCell); - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_RANGE) - { - rangeDistributedRelationCount++; - } - else if (partitionMethod == DISTRIBUTE_BY_HASH) - { - hashDistributedRelationCount++; - } - else - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("Currently range and hash partitioned " - "relations are supported"))); - } - } - - if ((rangeDistributedRelationCount > 0) && (hashDistributedRelationCount > 0)) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("A query including both range and hash " - "partitioned relations are unsupported"))); - } - - foreach(relationIdCell, relationIdList) - { - Oid relationId = lfirst_oid(relationIdCell); - bool coPartitionedTables = false; - Oid currentRelationId = relationId; - - /* get shard list of first relation and continue for the next relation */ - if (relationIndex == 0) - { - firstTableRelationId = relationId; - relationIndex++; - - continue; - } - - /* check if this table has 1-1 shard partitioning with first table */ - coPartitionedTables = CoPartitionedTables(firstTableRelationId, - currentRelationId); - if (!coPartitionedTables) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("Shards of relations in subquery need to " - "have 1-to-1 shard partitioning"))); - } - } -} - - -/* - * RelationIdList returns list of unique relation ids in query tree. - */ -static List * -RelationIdList(Query *query) -{ - List *rangeTableList = NIL; - List *tableEntryList = NIL; - List *relationIdList = NIL; - ListCell *tableEntryCell = NULL; - - ExtractRangeTableRelationWalker((Node *) query, &rangeTableList); - tableEntryList = TableEntryList(rangeTableList); - - foreach(tableEntryCell, tableEntryList) - { - TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell); - Oid relationId = tableEntry->relationId; - - relationIdList = list_append_unique_oid(relationIdList, relationId); - } - - return relationIdList; -} - - /* * CoPartitionedTables checks if given two distributed tables have 1-to-1 * shard partitioning. - */ -static bool -CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) -{ - bool coPartitionedTables = true; - uint32 intervalIndex = 0; - DistTableCacheEntry *firstTableCache = DistributedTableCacheEntry(firstRelationId); - DistTableCacheEntry *secondTableCache = DistributedTableCacheEntry(secondRelationId); - ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray; - ShardInterval **sortedSecondIntervalArray = - secondTableCache->sortedShardIntervalArray; - uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength; - uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength; - FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction; - - if (firstListShardCount != secondListShardCount) - { - return false; - } - - /* if there are not any shards just return true */ - if (firstListShardCount == 0) - { - return true; - } - - Assert(comparisonFunction != NULL); - /* * Check if the tables have the same colocation ID - if so, we know * they're colocated. @@ -3691,126 +2995,6 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) * interval minimum values. Then it compares every shard interval in order * and if any pair of shard intervals are not equal it returns false. */ - for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++) - { - ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex]; - ShardInterval *secondInterval = sortedSecondIntervalArray[intervalIndex]; - - bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction, - firstInterval, - secondInterval); - if (!shardIntervalsEqual) - { - coPartitionedTables = false; - break; - } - } - - return coPartitionedTables; -} - - -/* - * ShardIntervalsEqual checks if given shard intervals have equal min/max values. - */ -static bool -ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval, - ShardInterval *secondInterval) -{ - bool shardIntervalsEqual = false; - Datum firstMin = 0; - Datum firstMax = 0; - Datum secondMin = 0; - Datum secondMax = 0; - - firstMin = firstInterval->minValue; - firstMax = firstInterval->maxValue; - secondMin = secondInterval->minValue; - secondMax = secondInterval->maxValue; - - if (firstInterval->minValueExists && firstInterval->maxValueExists && - secondInterval->minValueExists && secondInterval->maxValueExists) - { - Datum minDatum = CompareCall2(comparisonFunction, firstMin, secondMin); - Datum maxDatum = CompareCall2(comparisonFunction, firstMax, secondMax); - int firstComparison = DatumGetInt32(minDatum); - int secondComparison = DatumGetInt32(maxDatum); - - if (firstComparison == 0 && secondComparison == 0) - { - shardIntervalsEqual = true; - } - } - - return shardIntervalsEqual; -} - - -/* - * ErrorIfUnsupportedFilters checks if all leaf queries in the given query have - * same filter on the partition column. Note that if there are queries without - * any filter on the partition column, they don't break this prerequisite. - */ -static void -ErrorIfUnsupportedFilters(Query *subquery) -{ - List *queryList = NIL; - ListCell *queryCell = NULL; - List *subqueryOpExpressionList = NIL; - List *relationIdList = RelationIdList(subquery); - - /* - * Get relation id of any relation in the subquery and create partiton column - * for this relation. We will use this column to replace columns on operator - * expressions on different tables. Then we compare these operator expressions - * to see if they consist of same operator and constant value. - */ - Oid relationId = linitial_oid(relationIdList); - Var *partitionColumn = PartitionColumn(relationId, 0); - - ExtractQueryWalker((Node *) subquery, &queryList); - foreach(queryCell, queryList) - { - Query *query = (Query *) lfirst(queryCell); - List *opExpressionList = NIL; - List *newOpExpressionList = NIL; - - bool leafQuery = LeafQuery(query); - if (!leafQuery) - { - continue; - } - - opExpressionList = PartitionColumnOpExpressionList(query); - if (opExpressionList == NIL) - { - continue; - } - - newOpExpressionList = ReplaceColumnsInOpExpressionList(opExpressionList, - partitionColumn); - - if (subqueryOpExpressionList == NIL) - { - subqueryOpExpressionList = newOpExpressionList; - } - else - { - bool equalOpExpressionLists = EqualOpExpressionLists(subqueryOpExpressionList, - newOpExpressionList); - if (!equalOpExpressionLists) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("Currently all leaf queries need to " - "have same filters on partition column"))); - } - } - } -} - - -/* * ExtractQueryWalker walks over a query, and finds all queries in the query * tree and returns these queries. */ @@ -4008,51 +3192,6 @@ ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn) } -/* - * EqualOpExpressionLists checks if given two operator expression lists are - * equal. - */ -static bool -EqualOpExpressionLists(List *firstOpExpressionList, List *secondOpExpressionList) -{ - bool equalOpExpressionLists = false; - ListCell *firstOpExpressionCell = NULL; - uint32 equalOpExpressionCount = 0; - uint32 firstOpExpressionCount = list_length(firstOpExpressionList); - uint32 secondOpExpressionCount = list_length(secondOpExpressionList); - - if (firstOpExpressionCount != secondOpExpressionCount) - { - return false; - } - - foreach(firstOpExpressionCell, firstOpExpressionList) - { - OpExpr *firstOpExpression = (OpExpr *) lfirst(firstOpExpressionCell); - ListCell *secondOpExpressionCell = NULL; - - foreach(secondOpExpressionCell, secondOpExpressionList) - { - OpExpr *secondOpExpression = (OpExpr *) lfirst(secondOpExpressionCell); - bool equalExpressions = equal(firstOpExpression, secondOpExpression); - - if (equalExpressions) - { - equalOpExpressionCount++; - continue; - } - } - } - - if (equalOpExpressionCount == firstOpExpressionCount) - { - equalOpExpressionLists = true; - } - - return equalOpExpressionLists; -} - - /* * WorkerLimitCount checks if the given extended node contains a limit node, and * if that node can be pushed down. For this, the function checks if this limit diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 9e590d7e6..a25fcb149 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/heapam.h" #include "access/nbtree.h" #include "catalog/pg_am.h" #include "catalog/pg_class.h" @@ -23,6 +24,7 @@ #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" +#include "distributed/relation_restriction_equivalence.h" #include "distributed/worker_protocol.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -34,6 +36,8 @@ #include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "utils/rel.h" +#include "utils/relcache.h" /* Config variable managed via guc.c */ @@ -56,11 +60,30 @@ typedef MultiNode *(*RuleApplyFunction) (MultiNode *leftNode, MultiNode *rightNo static RuleApplyFunction RuleApplyFunctionArray[JOIN_RULE_LAST] = { 0 }; /* join rules */ /* Local functions forward declarations */ +static bool SingleRelationRepartitionSubquery(Query *queryTree); +static DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown(Query * + originalQuery, + PlannerRestrictionContext + * + plannerRestrictionContext); +static DeferredErrorMessage * DeferErrorIfUnsupportedFilters(Query *subquery); +static bool EqualOpExpressionLists(List *firstOpExpressionList, + List *secondOpExpressionList); +static DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, + bool outerQueryHasLimit); +static DeferredErrorMessage * DeferErrorIfUnsupportedUnionQuery(Query *queryTree, + bool outerQueryHasLimit); +static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationList); +static DeferredErrorMessage * DeferErrorIfUnsupportedTableCombination(Query *queryTree); +static bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); +static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query); +static bool FullCompositeFieldList(List *compositeFieldList); static MultiNode * MultiPlanTree(Query *queryTree); static void ErrorIfQueryNotSupported(Query *queryTree); static bool HasUnsupportedJoinWalker(Node *node, void *context); static bool ErrorHintRequired(const char *errorHint, Query *queryTree); -static void ErrorIfSubqueryNotSupported(Query *subqueryTree); +static DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryRepartition(Query * + subqueryTree); static bool HasTablesample(Query *queryTree); static bool HasOuterJoin(Query *queryTree); static bool HasOuterJoinWalker(Node *node, void *maxJoinLevel); @@ -104,9 +127,12 @@ static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNo * Local functions forward declarations for subquery pushdown. Note that these * functions will be removed with upcoming subqery changes. */ +static MultiNode * MultiSubqueryPlanTree(Query *originalQuery, + Query *queryTree, + PlannerRestrictionContext * + plannerRestrictionContext); static MultiNode * SubqueryPushdownMultiPlanTree(Query *queryTree); -static void ErrorIfSubqueryJoin(Query *queryTree); static List * CreateSubqueryTargetEntryList(List *columnList); static void UpdateVarMappingsForExtendedOpNode(List *columnList, List *subqueryTargetEntryList); @@ -118,9 +144,15 @@ static MultiTable * MultiSubqueryPushdownTable(Query *subquery); * query tree yield by the standard planner. It uses helper functions to create logical * plan and adds a root node to top of it. The original query is only used for subquery * pushdown planning. + * + * We also pass queryTree and plannerRestrictionContext to the planner. They + * are primarily used to decide whether the subquery is safe to pushdown. + * If not, it helps to produce meaningful error messages for subquery + * pushdown planning. */ MultiTreeRoot * -MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree) +MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree, + PlannerRestrictionContext *plannerRestrictionContext) { MultiNode *multiQueryNode = NULL; MultiTreeRoot *rootNode = NULL; @@ -134,15 +166,8 @@ MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree) subqueryEntryList = SubqueryEntryList(queryTree); if (subqueryEntryList != NIL) { - if (SubqueryPushdown) - { - multiQueryNode = SubqueryPushdownMultiPlanTree(originalQuery); - } - else - { - ErrorIfSubqueryJoin(queryTree); - multiQueryNode = MultiPlanTree(queryTree); - } + multiQueryNode = MultiSubqueryPlanTree(originalQuery, queryTree, + plannerRestrictionContext); } else { @@ -157,6 +182,826 @@ MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree) } +/* + * MultiSubqueryPlanTree gets the query objects and returns logical plan + * for subqueries. + * + * We currently have two different code paths for creating logic plan for subqueries: + * (i) subquery pushdown + * (ii) single relation repartition subquery + * + * In order to create the logical plan, we follow the algorithm below: + * - If subquery pushdown planner can plan the query + * - We're done, we create the multi plan tree and return + * - Else + * - If the query is not eligible for single table repartition subquery planning + * - Throw the error that the subquery pushdown planner generated + * - If it is eligible for single table repartition subquery planning + * - Check for the errors for single table repartition subquery planning + * - If no errors found, we're done. Create the multi plan and return + * - If found errors, throw it + */ +static MultiNode * +MultiSubqueryPlanTree(Query *originalQuery, Query *queryTree, + PlannerRestrictionContext *plannerRestrictionContext) +{ + MultiNode *multiQueryNode = NULL; + DeferredErrorMessage *subqueryPushdownError = NULL; + + /* + * This is a generic error check that applies to both subquery pushdown + * and single table repartition subquery. + */ + ErrorIfQueryNotSupported(originalQuery); + + /* + * In principle, we're first trying subquery pushdown planner. If it fails + * to create a logical plan, continue with trying the single table + * repartition subquery planning. + */ + subqueryPushdownError = DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, + plannerRestrictionContext); + if (!subqueryPushdownError) + { + multiQueryNode = SubqueryPushdownMultiPlanTree(originalQuery); + } + else if (subqueryPushdownError) + { + bool singleRelationRepartitionSubquery = false; + RangeTblEntry *subqueryRangeTableEntry = NULL; + Query *subqueryTree = NULL; + DeferredErrorMessage *repartitionQueryError = NULL; + List *subqueryEntryList = NULL; + + /* + * If not eligible for single relation repartition query, we should raise + * subquery pushdown error. + */ + singleRelationRepartitionSubquery = + SingleRelationRepartitionSubquery(originalQuery); + if (!singleRelationRepartitionSubquery) + { + RaiseDeferredErrorInternal(subqueryPushdownError, ERROR); + } + + subqueryEntryList = SubqueryEntryList(queryTree); + subqueryRangeTableEntry = (RangeTblEntry *) linitial(subqueryEntryList); + Assert(subqueryRangeTableEntry->rtekind == RTE_SUBQUERY); + + subqueryTree = subqueryRangeTableEntry->subquery; + + repartitionQueryError = DeferErrorIfUnsupportedSubqueryRepartition(subqueryTree); + if (repartitionQueryError) + { + RaiseDeferredErrorInternal(repartitionQueryError, ERROR); + } + + /* all checks has passed, safe to create the multi plan */ + multiQueryNode = MultiPlanTree(queryTree); + } + + Assert(multiQueryNode != NULL); + + return multiQueryNode; +} + + +/* + * SingleRelationRepartitionSubquery returns true if it is eligible single + * repartition query planning in the sense that: + * - None of the levels of the subquery contains a join + * - Only a single RTE_RELATION exists, which means only a single table + * name is specified on the whole query + * - No sublinks exists in the subquery + * + * Note that the caller should still call DeferErrorIfUnsupportedSubqueryRepartition() + * to ensure that Citus supports the subquery. Also, this function is designed to run + * on the original query. + */ +static bool +SingleRelationRepartitionSubquery(Query *queryTree) +{ + List *rangeTableIndexList = NULL; + RangeTblEntry *rangeTableEntry = NULL; + List *rangeTableList = queryTree->rtable; + int rangeTableIndex = 0; + + /* we don't support subqueries in WHERE */ + if (queryTree->hasSubLinks) + { + return false; + } + + /* + * Don't allow joins and set operations. If join appears in the queryTree, the + * length would be greater than 1. If only set operations exists, the length + * would be 0. + */ + ExtractRangeTableIndexWalker((Node *) queryTree->jointree, + &rangeTableIndexList); + if (list_length(rangeTableIndexList) != 1) + { + return false; + } + + rangeTableIndex = linitial_int(rangeTableIndexList); + rangeTableEntry = rt_fetch(rangeTableIndex, rangeTableList); + if (rangeTableEntry->rtekind == RTE_RELATION) + { + return true; + } + else if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { + Query *subqueryTree = rangeTableEntry->subquery; + + return SingleRelationRepartitionSubquery(subqueryTree); + } + + return false; +} + + +/* + * DeferErrorIfContainsUnsupportedSubqueryPushdown iterates on the query's subquery + * entry list and uses helper functions to check if we can push down subquery + * to worker nodes. These helper functions returns a deferred error if we + * cannot push down the subquery. + */ +static DeferredErrorMessage * +DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext) +{ + ListCell *rangeTableEntryCell = NULL; + List *subqueryEntryList = NIL; + bool outerQueryHasLimit = false; + DeferredErrorMessage *error = NULL; + RelationRestrictionContext *relationRestrictionContext = + plannerRestrictionContext->relationRestrictionContext; + + if (originalQuery->limitCount != NULL) + { + outerQueryHasLimit = true; + } + + /* + * We're checking two things here: + * (i) If the query contains a top level union, ensure that all leaves + * return the partition key at the same position + * (ii) Else, check whether all relations joined on the partition key or not + */ + if (ContainsUnionSubquery(originalQuery)) + { + if (!SafeToPushdownUnionSubquery(relationRestrictionContext)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot pushdown the subquery since all leaves of " + "the UNION does not include partition key at the " + "same position", + "Each leaf query of the UNION should return " + "partition key at the same position on its " + "target list.", NULL); + } + } + else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot pushdown the subquery since all relations are not " + "joined using distribution keys", + "Each relation should be joined with at least " + "one another relation using distribution keys and " + "equality operator.", NULL); + } + + subqueryEntryList = SubqueryEntryList(originalQuery); + foreach(rangeTableEntryCell, subqueryEntryList) + { + RangeTblEntry *rangeTableEntry = lfirst(rangeTableEntryCell); + Query *subquery = rangeTableEntry->subquery; + + error = DeferErrorIfCannotPushdownSubquery(subquery, outerQueryHasLimit); + if (error) + { + return error; + } + + error = DeferErrorIfUnsupportedFilters(subquery); + if (error) + { + return error; + } + } + + return NULL; +} + + +/* + * DeferErrorIfUnsupportedFilters checks if all leaf queries in the given query have + * same filter on the partition column. Note that if there are queries without + * any filter on the partition column, they don't break this prerequisite. + */ +static DeferredErrorMessage * +DeferErrorIfUnsupportedFilters(Query *subquery) +{ + List *queryList = NIL; + ListCell *queryCell = NULL; + List *subqueryOpExpressionList = NIL; + List *relationIdList = RelationIdList(subquery); + + /* + * Get relation id of any relation in the subquery and create partiton column + * for this relation. We will use this column to replace columns on operator + * expressions on different tables. Then we compare these operator expressions + * to see if they consist of same operator and constant value. + */ + Oid relationId = linitial_oid(relationIdList); + Var *partitionColumn = PartitionColumn(relationId, 0); + + ExtractQueryWalker((Node *) subquery, &queryList); + foreach(queryCell, queryList) + { + Query *query = (Query *) lfirst(queryCell); + List *opExpressionList = NIL; + List *newOpExpressionList = NIL; + + bool leafQuery = LeafQuery(query); + if (!leafQuery) + { + continue; + } + + opExpressionList = PartitionColumnOpExpressionList(query); + if (opExpressionList == NIL) + { + continue; + } + + newOpExpressionList = ReplaceColumnsInOpExpressionList(opExpressionList, + partitionColumn); + + if (subqueryOpExpressionList == NIL) + { + subqueryOpExpressionList = newOpExpressionList; + } + else + { + bool equalOpExpressionLists = EqualOpExpressionLists(subqueryOpExpressionList, + newOpExpressionList); + if (!equalOpExpressionLists) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot push down this subquery", + "Currently all leaf queries need to " + "have same filters on partition column", NULL); + } + } + } + + return NULL; +} + + +/* + * EqualOpExpressionLists checks if given two operator expression lists are + * equal. + */ +static bool +EqualOpExpressionLists(List *firstOpExpressionList, List *secondOpExpressionList) +{ + bool equalOpExpressionLists = false; + ListCell *firstOpExpressionCell = NULL; + uint32 equalOpExpressionCount = 0; + uint32 firstOpExpressionCount = list_length(firstOpExpressionList); + uint32 secondOpExpressionCount = list_length(secondOpExpressionList); + + if (firstOpExpressionCount != secondOpExpressionCount) + { + return false; + } + + foreach(firstOpExpressionCell, firstOpExpressionList) + { + OpExpr *firstOpExpression = (OpExpr *) lfirst(firstOpExpressionCell); + ListCell *secondOpExpressionCell = NULL; + + foreach(secondOpExpressionCell, secondOpExpressionList) + { + OpExpr *secondOpExpression = (OpExpr *) lfirst(secondOpExpressionCell); + bool equalExpressions = equal(firstOpExpression, secondOpExpression); + + if (equalExpressions) + { + equalOpExpressionCount++; + continue; + } + } + } + + if (equalOpExpressionCount == firstOpExpressionCount) + { + equalOpExpressionLists = true; + } + + return equalOpExpressionLists; +} + + +/* + * DeferErrorIfCannotPushdownSubquery recursively checks if we can push down the given + * subquery to worker nodes. If we cannot push down the subquery, this function + * returns a deferred error. + * + * We can push down a subquery if it follows rules below. We support nested queries + * as long as they follow the same rules, and we recurse to validate each subquery + * for this given query. + * a. If there is an aggregate, it must be grouped on partition column. + * b. If there is a join, it must be between two regular tables or two subqueries. + * We don't support join between a regular table and a subquery. And columns on + * the join condition must be partition columns. + * c. If there is a distinct clause, it must be on the partition column. + * + * This function is very similar to ErrorIfQueryNotSupported() in logical + * planner, but we don't reuse it, because differently for subqueries we support + * a subset of distinct, union and left joins. + * + * Note that this list of checks is not exhaustive, there can be some cases + * which we let subquery to run but returned results would be wrong. Such as if + * a subquery has a group by on another subquery which includes order by with + * limit, we let this query to run, but results could be wrong depending on the + * features of underlying tables. + */ +static DeferredErrorMessage * +DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit) +{ + bool preconditionsSatisfied = true; + char *errorDetail = NULL; + List *subqueryEntryList = NIL; + ListCell *rangeTableEntryCell = NULL; + DeferredErrorMessage *deferredError = NULL; + + deferredError = DeferErrorIfUnsupportedTableCombination(subqueryTree); + if (deferredError) + { + return deferredError; + } + + if (subqueryTree->hasSubLinks) + { + preconditionsSatisfied = false; + errorDetail = "Subqueries other than from-clause subqueries are unsupported"; + } + + if (subqueryTree->rtable == NIL) + { + preconditionsSatisfied = false; + errorDetail = "Subqueries without relations are unsupported"; + } + + if (subqueryTree->hasWindowFuncs) + { + preconditionsSatisfied = false; + errorDetail = "Window functions are currently unsupported"; + } + + if (subqueryTree->limitOffset) + { + preconditionsSatisfied = false; + errorDetail = "Offset clause is currently unsupported"; + } + + if (subqueryTree->limitCount && !outerQueryHasLimit) + { + preconditionsSatisfied = false; + errorDetail = "Limit in subquery without limit in the outer query is unsupported"; + } + + if (subqueryTree->setOperations) + { + deferredError = DeferErrorIfUnsupportedUnionQuery(subqueryTree, + outerQueryHasLimit); + if (deferredError) + { + return deferredError; + } + } + + if (subqueryTree->hasRecursive) + { + preconditionsSatisfied = false; + errorDetail = "Recursive queries are currently unsupported"; + } + + if (subqueryTree->cteList) + { + preconditionsSatisfied = false; + errorDetail = "Common Table Expressions are currently unsupported"; + } + + if (subqueryTree->hasForUpdate) + { + preconditionsSatisfied = false; + errorDetail = "For Update/Share commands are currently unsupported"; + } + + /* group clause list must include partition column */ + if (subqueryTree->groupClause) + { + List *groupClauseList = subqueryTree->groupClause; + List *targetEntryList = subqueryTree->targetList; + List *groupTargetEntryList = GroupTargetEntryList(groupClauseList, + targetEntryList); + bool groupOnPartitionColumn = TargetListOnPartitionColumn(subqueryTree, + groupTargetEntryList); + if (!groupOnPartitionColumn) + { + preconditionsSatisfied = false; + errorDetail = "Group by list without partition column is currently " + "unsupported"; + } + } + + /* we don't support aggregates without group by */ + if (subqueryTree->hasAggs && (subqueryTree->groupClause == NULL)) + { + preconditionsSatisfied = false; + errorDetail = "Aggregates without group by are currently unsupported"; + } + + /* having clause without group by on partition column is not supported */ + if (subqueryTree->havingQual && (subqueryTree->groupClause == NULL)) + { + preconditionsSatisfied = false; + errorDetail = "Having qual without group by on partition column is " + "currently unsupported"; + } + + /* distinct clause list must include partition column */ + if (subqueryTree->distinctClause) + { + List *distinctClauseList = subqueryTree->distinctClause; + List *targetEntryList = subqueryTree->targetList; + List *distinctTargetEntryList = GroupTargetEntryList(distinctClauseList, + targetEntryList); + bool distinctOnPartitionColumn = + TargetListOnPartitionColumn(subqueryTree, distinctTargetEntryList); + if (!distinctOnPartitionColumn) + { + preconditionsSatisfied = false; + errorDetail = "Distinct on columns without partition column is " + "currently unsupported"; + } + } + + /* finally check and return deferred if not satisfied */ + if (!preconditionsSatisfied) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot push down this subquery", + errorDetail, NULL); + } + + /* recursively do same check for subqueries of this query */ + subqueryEntryList = SubqueryEntryList(subqueryTree); + foreach(rangeTableEntryCell, subqueryEntryList) + { + RangeTblEntry *rangeTableEntry = + (RangeTblEntry *) lfirst(rangeTableEntryCell); + + Query *innerSubquery = rangeTableEntry->subquery; + deferredError = DeferErrorIfCannotPushdownSubquery(innerSubquery, + outerQueryHasLimit); + if (deferredError) + { + return deferredError; + } + } + + return NULL; +} + + +/* + * DeferErrorIfUnsupportedUnionQuery is a helper function for ErrorIfCannotPushdownSubquery(). + * It basically iterates over the subqueries that reside under the given set operations. + * + * The function also errors out for set operations INTERSECT and EXCEPT. + */ +static DeferredErrorMessage * +DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree, + bool outerQueryHasLimit) +{ + List *rangeTableIndexList = NIL; + ListCell *rangeTableIndexCell = NULL; + List *setOperationStatementList = NIL; + ListCell *setOperationStatmentCell = NULL; + List *rangeTableList = subqueryTree->rtable; + + ExtractSetOperationStatmentWalker((Node *) subqueryTree->setOperations, + &setOperationStatementList); + foreach(setOperationStatmentCell, setOperationStatementList) + { + SetOperationStmt *setOperation = + (SetOperationStmt *) lfirst(setOperationStatmentCell); + + if (setOperation->op != SETOP_UNION) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot push down this subquery", + "Intersect and Except are currently unsupported", NULL); + } + } + + ExtractRangeTableIndexWalker((Node *) subqueryTree->setOperations, + &rangeTableIndexList); + foreach(rangeTableIndexCell, rangeTableIndexList) + { + int rangeTableIndex = lfirst_int(rangeTableIndexCell); + RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableIndex, rangeTableList); + DeferredErrorMessage *deferredError = NULL; + + Assert(rangeTableEntry->rtekind == RTE_SUBQUERY); + + deferredError = DeferErrorIfCannotPushdownSubquery(rangeTableEntry->subquery, + outerQueryHasLimit); + if (deferredError) + { + return deferredError; + } + } + + return NULL; +} + + +/* + * ExtractSetOperationStatementWalker walks over a set operations statment, + * and finds all set operations in the tree. + */ +static bool +ExtractSetOperationStatmentWalker(Node *node, List **setOperationList) +{ + bool walkerResult = false; + if (node == NULL) + { + return false; + } + + if (IsA(node, SetOperationStmt)) + { + SetOperationStmt *setOperation = (SetOperationStmt *) node; + + (*setOperationList) = lappend(*setOperationList, setOperation); + } + + walkerResult = expression_tree_walker(node, ExtractSetOperationStatmentWalker, + setOperationList); + + return walkerResult; +} + + +/* + * DeferErrorIfUnsupportedTableCombination checks if the given query tree contains any + * unsupported range table combinations. For this, the function walks over all + * range tables in the join tree, and checks if they correspond to simple relations + * or subqueries. It also checks if there is a join between a regular table and + * a subquery and if join is on more than two range table entries. If any error is found, + * a deferred error is returned. Else, NULL is returned. + */ +static DeferredErrorMessage * +DeferErrorIfUnsupportedTableCombination(Query *queryTree) +{ + List *rangeTableList = queryTree->rtable; + List *joinTreeTableIndexList = NIL; + ListCell *joinTreeTableIndexCell = NULL; + bool unsupporteTableCombination = false; + char *errorDetail = NULL; + uint32 relationRangeTableCount = 0; + uint32 subqueryRangeTableCount = 0; + + /* + * Extract all range table indexes from the join tree. Note that sub-queries + * that get pulled up by PostgreSQL don't appear in this join tree. + */ + ExtractRangeTableIndexWalker((Node *) queryTree->jointree, &joinTreeTableIndexList); + foreach(joinTreeTableIndexCell, joinTreeTableIndexList) + { + /* + * Join tree's range table index starts from 1 in the query tree. But, + * list indexes start from 0. + */ + int joinTreeTableIndex = lfirst_int(joinTreeTableIndexCell); + int rangeTableListIndex = joinTreeTableIndex - 1; + + RangeTblEntry *rangeTableEntry = + (RangeTblEntry *) list_nth(rangeTableList, rangeTableListIndex); + + /* + * Check if the range table in the join tree is a simple relation or a + * subquery. + */ + if (rangeTableEntry->rtekind == RTE_RELATION) + { + relationRangeTableCount++; + } + else if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { + subqueryRangeTableCount++; + } + else + { + unsupporteTableCombination = true; + errorDetail = "Table expressions other than simple relations and " + "subqueries are currently unsupported"; + break; + } + } + + /* finally check and error out if not satisfied */ + if (unsupporteTableCombination) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot push down this subquery", + errorDetail, NULL); + } + + return NULL; +} + + +/* + * TargetListOnPartitionColumn checks if at least one target list entry is on + * partition column. + */ +static bool +TargetListOnPartitionColumn(Query *query, List *targetEntryList) +{ + bool targetListOnPartitionColumn = false; + List *compositeFieldList = NIL; + + ListCell *targetEntryCell = NULL; + foreach(targetEntryCell, targetEntryList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + Expr *targetExpression = targetEntry->expr; + + bool isPartitionColumn = IsPartitionColumn(targetExpression, query); + if (isPartitionColumn) + { + FieldSelect *compositeField = CompositeFieldRecursive(targetExpression, + query); + if (compositeField) + { + compositeFieldList = lappend(compositeFieldList, compositeField); + } + else + { + targetListOnPartitionColumn = true; + break; + } + } + } + + /* check composite fields */ + if (!targetListOnPartitionColumn) + { + bool fullCompositeFieldList = FullCompositeFieldList(compositeFieldList); + if (fullCompositeFieldList) + { + targetListOnPartitionColumn = true; + } + } + + return targetListOnPartitionColumn; +} + + +/* + * FullCompositeFieldList gets a composite field list, and checks if all fields + * of composite type are used in the list. + */ +static bool +FullCompositeFieldList(List *compositeFieldList) +{ + bool fullCompositeFieldList = true; + bool *compositeFieldArray = NULL; + uint32 compositeFieldCount = 0; + uint32 fieldIndex = 0; + + ListCell *fieldSelectCell = NULL; + foreach(fieldSelectCell, compositeFieldList) + { + FieldSelect *fieldSelect = (FieldSelect *) lfirst(fieldSelectCell); + uint32 compositeFieldIndex = 0; + + Expr *fieldExpression = fieldSelect->arg; + if (!IsA(fieldExpression, Var)) + { + continue; + } + + if (compositeFieldArray == NULL) + { + uint32 index = 0; + Var *compositeColumn = (Var *) fieldExpression; + Oid compositeTypeId = compositeColumn->vartype; + Oid compositeRelationId = get_typ_typrelid(compositeTypeId); + + /* get composite type attribute count */ + Relation relation = relation_open(compositeRelationId, AccessShareLock); + compositeFieldCount = relation->rd_att->natts; + compositeFieldArray = palloc0(compositeFieldCount * sizeof(bool)); + relation_close(relation, AccessShareLock); + + for (index = 0; index < compositeFieldCount; index++) + { + compositeFieldArray[index] = false; + } + } + + compositeFieldIndex = fieldSelect->fieldnum - 1; + compositeFieldArray[compositeFieldIndex] = true; + } + + for (fieldIndex = 0; fieldIndex < compositeFieldCount; fieldIndex++) + { + if (!compositeFieldArray[fieldIndex]) + { + fullCompositeFieldList = false; + } + } + + if (compositeFieldCount == 0) + { + fullCompositeFieldList = false; + } + + return fullCompositeFieldList; +} + + +/* + * CompositeFieldRecursive recursively finds composite field in the query tree + * referred by given expression. If expression does not refer to a composite + * field, then it returns NULL. + * + * If expression is a field select we directly return composite field. If it is + * a column is referenced from a subquery, then we recursively check that subquery + * until we reach the source of that column, and find composite field. If this + * column is referenced from join range table entry, then we resolve which join + * column it refers and recursively use this column with the same query. + */ +static FieldSelect * +CompositeFieldRecursive(Expr *expression, Query *query) +{ + FieldSelect *compositeField = NULL; + List *rangetableList = query->rtable; + Index rangeTableEntryIndex = 0; + RangeTblEntry *rangeTableEntry = NULL; + Var *candidateColumn = NULL; + + if (IsA(expression, FieldSelect)) + { + compositeField = (FieldSelect *) expression; + return compositeField; + } + + if (IsA(expression, Var)) + { + candidateColumn = (Var *) expression; + } + else + { + return NULL; + } + + rangeTableEntryIndex = candidateColumn->varno - 1; + rangeTableEntry = list_nth(rangetableList, rangeTableEntryIndex); + + if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { + Query *subquery = rangeTableEntry->subquery; + List *targetEntryList = subquery->targetList; + AttrNumber targetEntryIndex = candidateColumn->varattno - 1; + TargetEntry *subqueryTargetEntry = list_nth(targetEntryList, targetEntryIndex); + + Expr *subqueryExpression = subqueryTargetEntry->expr; + compositeField = CompositeFieldRecursive(subqueryExpression, subquery); + } + else if (rangeTableEntry->rtekind == RTE_JOIN) + { + List *joinColumnList = rangeTableEntry->joinaliasvars; + AttrNumber joinColumnIndex = candidateColumn->varattno - 1; + Expr *joinColumn = list_nth(joinColumnList, joinColumnIndex); + + compositeField = CompositeFieldRecursive(joinColumn, query); + } + + return compositeField; +} + + /* * SubqueryEntryList finds the subquery nodes in the range table entry list, and * builds a list of subquery range table entries from these subquery nodes. Range @@ -259,14 +1104,14 @@ MultiPlanTree(Query *queryTree) List *columnList = NIL; ListCell *columnCell = NULL; + /* we only support single subquery in the entry list */ + Assert(list_length(subqueryEntryList) == 1); + subqueryRangeTableEntry = (RangeTblEntry *) linitial(subqueryEntryList); subqueryTree = subqueryRangeTableEntry->subquery; - /* check if subquery satisfies preconditons */ - ErrorIfSubqueryNotSupported(subqueryTree); - - /* check if subquery has joining tables */ - ErrorIfSubqueryJoin(subqueryTree); + /* ensure if subquery satisfies preconditions */ + Assert(DeferErrorIfUnsupportedSubqueryRepartition(subqueryTree) == NULL); subqueryNode = CitusMakeNode(MultiTable); subqueryNode->relationId = SUBQUERY_RELATION_ID; @@ -602,14 +1447,19 @@ ErrorHintRequired(const char *errorHint, Query *queryTree) /* - * ErrorIfSubqueryNotSupported checks that we can perform distributed planning for - * the given subquery. + * DeferErrorIfSubqueryNotSupported checks that we can perform distributed planning for + * the given subquery. If not, a deferred error is returned. The function recursively + * does this check to all lower levels of the subquery. */ -static void -ErrorIfSubqueryNotSupported(Query *subqueryTree) +static DeferredErrorMessage * +DeferErrorIfUnsupportedSubqueryRepartition(Query *subqueryTree) { char *errorDetail = NULL; bool preconditionsSatisfied = true; + List *joinTreeTableIndexList = NIL; + int rangeTableIndex = 0; + RangeTblEntry *rangeTableEntry = NULL; + Query *innerSubquery = NULL; if (!subqueryTree->hasAggs) { @@ -641,13 +1491,35 @@ ErrorIfSubqueryNotSupported(Query *subqueryTree) errorDetail = "Subqueries with offset are not supported yet"; } - /* finally check and error out if not satisfied */ + /* finally check and return error if conditions are not satisfied */ if (!preconditionsSatisfied) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning on this query"), - errdetail("%s", errorDetail))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed planning on this query", + errorDetail, NULL); } + + /* + * Extract all range table indexes from the join tree. Note that sub-queries + * that get pulled up by PostgreSQL don't appear in this join tree. + */ + ExtractRangeTableIndexWalker((Node *) subqueryTree->jointree, + &joinTreeTableIndexList); + Assert(list_length(joinTreeTableIndexList) == 1); + + /* continue with the inner subquery */ + rangeTableIndex = linitial_int(joinTreeTableIndexList); + rangeTableEntry = rt_fetch(rangeTableIndex, subqueryTree->rtable); + if (rangeTableEntry->rtekind == RTE_RELATION) + { + return NULL; + } + + Assert(rangeTableEntry->rtekind == RTE_SUBQUERY); + innerSubquery = rangeTableEntry->subquery; + + /* recursively continue to the inner subqueries */ + return DeferErrorIfUnsupportedSubqueryRepartition(innerSubquery); } @@ -2127,32 +2999,6 @@ SubqueryPushdownMultiPlanTree(Query *queryTree) } -/* - * ErrorIfSubqueryJoin errors out if the given query is a join query. Note that - * this function will not be required once we implement subquery joins. - */ -static void -ErrorIfSubqueryJoin(Query *queryTree) -{ - List *joinTreeTableIndexList = NIL; - uint32 joiningRangeTableCount = 0; - - /* - * Extract all range table indexes from the join tree. Note that sub-queries - * that get pulled up by PostgreSQL don't appear in this join tree. - */ - ExtractRangeTableIndexWalker((Node *) queryTree->jointree, &joinTreeTableIndexList); - joiningRangeTableCount = list_length(joinTreeTableIndexList); - - if (joiningRangeTableCount > 1) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning on this query"), - errdetail("Join in subqueries is not supported yet"))); - } -} - - /* * CreateSubqueryTargetEntryList creates a target entry for each unique column * in the column list and returns the target entry list. diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index cb4625e1e..05cdcddf2 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -124,6 +124,11 @@ static Job * BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestrictionContext); static List * SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionContext); +static void ErrorIfUnsupportedShardDistribution(Query *query); +static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); +static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, + ShardInterval *firstInterval, + ShardInterval *secondInterval); static Task * SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, RelationRestrictionContext *restrictionContext, uint32 taskId); @@ -2031,6 +2036,9 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte RelationRestrictionContext *relationRestrictionContext = plannerRestrictionContext->relationRestrictionContext; + /* error if shards are not co-partitioned */ + ErrorIfUnsupportedShardDistribution(subquery); + /* get list of all range tables in subquery tree */ ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList); @@ -2082,6 +2090,171 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte } +/* + * ErrorIfUnsupportedShardDistribution gets list of relations in the given query + * and checks if two conditions below hold for them, otherwise it errors out. + * a. Every relation is distributed by range or hash. This means shards are + * disjoint based on the partition column. + * b. All relations have 1-to-1 shard partitioning between them. This means + * shard count for every relation is same and for every shard in a relation + * there is exactly one shard in other relations with same min/max values. + */ +static void +ErrorIfUnsupportedShardDistribution(Query *query) +{ + Oid firstTableRelationId = InvalidOid; + List *relationIdList = RelationIdList(query); + ListCell *relationIdCell = NULL; + uint32 relationIndex = 0; + uint32 rangeDistributedRelationCount = 0; + uint32 hashDistributedRelationCount = 0; + + foreach(relationIdCell, relationIdList) + { + Oid relationId = lfirst_oid(relationIdCell); + char partitionMethod = PartitionMethod(relationId); + if (partitionMethod == DISTRIBUTE_BY_RANGE) + { + rangeDistributedRelationCount++; + } + else if (partitionMethod == DISTRIBUTE_BY_HASH) + { + hashDistributedRelationCount++; + } + else + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot push down this subquery"), + errdetail("Currently range and hash partitioned " + "relations are supported"))); + } + } + + if ((rangeDistributedRelationCount > 0) && (hashDistributedRelationCount > 0)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot push down this subquery"), + errdetail("A query including both range and hash " + "partitioned relations are unsupported"))); + } + + foreach(relationIdCell, relationIdList) + { + Oid relationId = lfirst_oid(relationIdCell); + bool coPartitionedTables = false; + Oid currentRelationId = relationId; + + /* get shard list of first relation and continue for the next relation */ + if (relationIndex == 0) + { + firstTableRelationId = relationId; + relationIndex++; + + continue; + } + + /* check if this table has 1-1 shard partitioning with first table */ + coPartitionedTables = CoPartitionedTables(firstTableRelationId, + currentRelationId); + if (!coPartitionedTables) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot push down this subquery"), + errdetail("Shards of relations in subquery need to " + "have 1-to-1 shard partitioning"))); + } + } +} + + +/* + * CoPartitionedTables checks if given two distributed tables have 1-to-1 shard + * partitioning. It uses shard interval array that are sorted on interval minimum + * values. Then it compares every shard interval in order and if any pair of + * shard intervals are not equal it returns false. + */ +static bool +CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) +{ + bool coPartitionedTables = true; + uint32 intervalIndex = 0; + DistTableCacheEntry *firstTableCache = DistributedTableCacheEntry(firstRelationId); + DistTableCacheEntry *secondTableCache = DistributedTableCacheEntry(secondRelationId); + ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray; + ShardInterval **sortedSecondIntervalArray = + secondTableCache->sortedShardIntervalArray; + uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength; + uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength; + FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction; + + if (firstListShardCount != secondListShardCount) + { + return false; + } + + /* if there are not any shards just return true */ + if (firstListShardCount == 0) + { + return true; + } + + Assert(comparisonFunction != NULL); + + for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++) + { + ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex]; + ShardInterval *secondInterval = sortedSecondIntervalArray[intervalIndex]; + + bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction, + firstInterval, + secondInterval); + if (!shardIntervalsEqual) + { + coPartitionedTables = false; + break; + } + } + + return coPartitionedTables; +} + + +/* + * ShardIntervalsEqual checks if given shard intervals have equal min/max values. + */ +static bool +ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval, + ShardInterval *secondInterval) +{ + bool shardIntervalsEqual = false; + Datum firstMin = 0; + Datum firstMax = 0; + Datum secondMin = 0; + Datum secondMax = 0; + + firstMin = firstInterval->minValue; + firstMax = firstInterval->maxValue; + secondMin = secondInterval->minValue; + secondMax = secondInterval->maxValue; + + if (firstInterval->minValueExists && firstInterval->maxValueExists && + secondInterval->minValueExists && secondInterval->maxValueExists) + { + Datum minDatum = CompareCall2(comparisonFunction, firstMin, secondMin); + Datum maxDatum = CompareCall2(comparisonFunction, firstMax, secondMax); + int firstComparison = DatumGetInt32(minDatum); + int secondComparison = DatumGetInt32(maxDatum); + + if (firstComparison == 0 && secondComparison == 0) + { + shardIntervalsEqual = true; + } + } + + return shardIntervalsEqual; +} + + /* * SubqueryTaskCreate creates a sql task by replacing the target * shardInterval's boundary value.. Then performs the normal diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 49a81ee3b..3cebc1e53 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -310,10 +310,9 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query */ if ((!distributedPlan || distributedPlan->planningError) && !hasUnresolvedParams) { - /* Create and optimize logical plan */ - MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query); - MultiLogicalPlanOptimize(logicalPlan, plannerRestrictionContext, - originalQuery); + MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query, + plannerRestrictionContext); + MultiLogicalPlanOptimize(logicalPlan); /* * This check is here to make it likely that all node types used in diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 3af5056a2..5fb340e17 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -1366,3 +1366,29 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) return InvalidAttrNumber; } + + +/* + * RelationIdList returns list of unique relation ids in query tree. + */ +List * +RelationIdList(Query *query) +{ + List *rangeTableList = NIL; + List *tableEntryList = NIL; + List *relationIdList = NIL; + ListCell *tableEntryCell = NULL; + + ExtractRangeTableRelationWalker((Node *) query, &rangeTableList); + tableEntryList = TableEntryList(rangeTableList); + + foreach(tableEntryCell, tableEntryList) + { + TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell); + Oid relationId = tableEntry->relationId; + + relationIdList = list_append_unique_oid(relationIdList, relationId); + } + + return relationIdList; +} diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index d10a9d1aa..9c3c137fe 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -107,9 +107,7 @@ extern double CountDistinctErrorRate; /* Function declaration for optimizing logical plans */ -extern void MultiLogicalPlanOptimize(MultiTreeRoot *multiTree, - PlannerRestrictionContext *plannerRestrictionContext, - Query *originalQuery); +extern void MultiLogicalPlanOptimize(MultiTreeRoot *multiTree); /* Function declaration for getting partition method for the given relation */ extern char PartitionMethod(Oid relationId); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 5b11a8ff3..52c676b4b 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -16,6 +16,7 @@ #include "distributed/citus_nodes.h" #include "distributed/multi_join_order.h" +#include "distributed/relation_restriction_equivalence.h" #include "nodes/nodes.h" #include "nodes/primnodes.h" #include "nodes/parsenodes.h" @@ -180,7 +181,9 @@ extern bool SubqueryPushdown; /* Function declarations for building logical plans */ -extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree); +extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree, + PlannerRestrictionContext * + plannerRestrictionContext); extern bool NeedsDistributedPlanning(Query *queryTree); extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode); diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index f06a707c5..f0fcd33e9 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -19,6 +19,7 @@ extern bool ContainsUnionSubquery(Query *queryTree); extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext * plannerRestrictionContext); extern bool SafeToPushdownUnionSubquery(RelationRestrictionContext *restrictionContext); +extern List * RelationIdList(Query *query); #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/expected/multi_complex_expressions.out b/src/test/regress/expected/multi_complex_expressions.out index 3723cff0a..0914adb6f 100644 --- a/src/test/regress/expected/multi_complex_expressions.out +++ b/src/test/regress/expected/multi_complex_expressions.out @@ -348,16 +348,14 @@ ERROR: cannot perform local joins that involve expressions DETAIL: local joins can be performed between columns only -- Check that we can issue limit/offset queries -- OFFSET in subqueries are not supported --- Error in the planner when subquery pushdown is off +-- Error in the planner when single repartition subquery +SELECT * FROM (SELECT o_custkey FROM orders GROUP BY o_custkey ORDER BY o_custkey OFFSET 20) sq; +ERROR: cannot perform distributed planning on this query +DETAIL: Subqueries with offset are not supported yet +-- Error in the optimizer when subquery pushdown is on SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; ERROR: cannot perform distributed planning on this query DETAIL: Subqueries with offset are not supported yet -SET citus.subquery_pushdown TO true; --- Error in the optimizer when subquery pushdown is on -SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; -ERROR: cannot push down this subquery -DETAIL: Offset clause is currently unsupported -SET citus.subquery_pushdown TO false; -- Simple LIMIT/OFFSET with ORDER BY SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20; o_orderkey diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index 74b3c0b24..37f3a8718 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -802,8 +802,8 @@ SELECT * FROM ( (SELECT * FROM articles_hash_mx WHERE author_id = 1) UNION (SELECT * FROM articles_hash_mx WHERE author_id = 2)) uu; -ERROR: cannot perform distributed planning on this query -DETAIL: Subqueries without group by clause are not supported yet +ERROR: cannot push down this subquery +DETAIL: Currently all leaf queries need to have same filters on partition column -- error out for queries with repartition jobs SELECT * FROM articles_hash_mx a, articles_hash_mx b diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index d12ca62fd..c2996c6cb 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -916,8 +916,8 @@ SELECT * FROM ( (SELECT * FROM articles_hash WHERE author_id = 1) UNION (SELECT * FROM articles_hash WHERE author_id = 2)) uu; -ERROR: cannot perform distributed planning on this query -DETAIL: Subqueries without group by clause are not supported yet +ERROR: cannot push down this subquery +DETAIL: Currently all leaf queries need to have same filters on partition column -- error out for queries with repartition jobs SELECT * FROM articles_hash a, articles_hash b diff --git a/src/test/regress/expected/multi_subquery_behavioral_analytics.out b/src/test/regress/expected/multi_subquery_behavioral_analytics.out index fa49997d5..5d2a93480 100644 --- a/src/test/regress/expected/multi_subquery_behavioral_analytics.out +++ b/src/test/regress/expected/multi_subquery_behavioral_analytics.out @@ -6,7 +6,6 @@ -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1430000; -- ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1430000; -SET citus.subquery_pushdown TO TRUE; SET citus.enable_router_execution TO FALSE; ------------------------------------ -- Vanilla funnel query @@ -1673,5 +1672,4 @@ WHERE b.user_id IS NULL GROUP BY a.user_id; ERROR: cannot push down this subquery DETAIL: Table expressions other than simple relations and subqueries are currently unsupported -SET citus.subquery_pushdown TO FALSE; SET citus.enable_router_execution TO TRUE; diff --git a/src/test/regress/expected/multi_subquery_complex_queries.out b/src/test/regress/expected/multi_subquery_complex_queries.out index 294327710..636a0f553 100644 --- a/src/test/regress/expected/multi_subquery_complex_queries.out +++ b/src/test/regress/expected/multi_subquery_complex_queries.out @@ -7,7 +7,6 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000; -SET citus.subquery_pushdown TO TRUE; SET citus.enable_router_execution TO FALSE; -- -- UNIONs and JOINs mixed @@ -2025,8 +2024,8 @@ FROM ( ) user_id ORDER BY 1, 2 limit 10; -ERROR: cannot push down this subquery -DETAIL: Window functions are currently unsupported +ERROR: cannot perform distributed planning on this query +DETAIL: Subqueries without group by clause are not supported yet -- not supported due to non relation rte SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM @@ -2113,5 +2112,4 @@ GROUP BY types ORDER BY types; ERROR: cannot push down this subquery DETAIL: Subqueries without relations are unsupported -SET citus.subquery_pushdown TO FALSE; SET citus.enable_router_execution TO TRUE; diff --git a/src/test/regress/expected/multi_subquery_union.out b/src/test/regress/expected/multi_subquery_union.out index 3b2be3465..f6ea5e9ac 100644 --- a/src/test/regress/expected/multi_subquery_union.out +++ b/src/test/regress/expected/multi_subquery_union.out @@ -4,7 +4,6 @@ -- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; -SET citus.subquery_pushdown TO true; SET citus.enable_router_execution TO false; -- a very simple union query SELECT user_id, counter @@ -845,5 +844,4 @@ GROUP BY types ORDER BY types; ERROR: cannot push down this subquery DETAIL: Subqueries without relations are unsupported -SET citus.subquery_pushdown TO false; SET citus.enable_router_execution TO true; diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out index f8444c562..159ca78f2 100644 --- a/src/test/regress/expected/multi_view.out +++ b/src/test/regress/expected/multi_view.out @@ -234,16 +234,33 @@ SET citus.task_executor_type to DEFAULT; -- create a view with aggregate CREATE VIEW lineitems_by_shipping_method AS SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1; --- following will fail due to non-flattening of subquery due to GROUP BY +-- following will fail due to non GROUP BY of partition key SELECT * FROM lineitems_by_shipping_method; ERROR: Unrecognized range table id 1 -- create a view with group by on partition column CREATE VIEW lineitems_by_orderkey AS - SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1; --- this will also fail due to same reason -SELECT * FROM lineitems_by_orderkey; -ERROR: Unrecognized range table id 1 --- however it would work if it is made router plannable + SELECT + l_orderkey, count(*) + FROM + lineitem_hash_part + GROUP BY 1; +-- this should work since we're able to push down this query +SELECT * FROM lineitems_by_orderkey ORDER BY 2 DESC, 1 ASC LIMIT 10; + l_orderkey | count +------------+------- + 7 | 7 + 68 | 7 + 129 | 7 + 164 | 7 + 194 | 7 + 225 | 7 + 226 | 7 + 322 | 7 + 326 | 7 + 354 | 7 +(10 rows) + +-- it would also work since it is made router plannable SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100; l_orderkey | count ------------+------- diff --git a/src/test/regress/input/multi_complex_count_distinct.source b/src/test/regress/input/multi_complex_count_distinct.source index ffd4438a1..1ca2cc617 100644 --- a/src/test/regress/input/multi_complex_count_distinct.source +++ b/src/test/regress/input/multi_complex_count_distinct.source @@ -130,9 +130,9 @@ SELECT * SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT 1) + l_linenumber, count(DISTINCT 1) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; @@ -140,9 +140,9 @@ SELECT * SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT (random() * 5)::int) + l_linenumber, count(DISTINCT (random() * 5)::int) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; @@ -277,18 +277,18 @@ SELECT * SELECT * FROM ( SELECT - l_orderkey, sum(DISTINCT l_partkey) + l_linenumber, sum(DISTINCT l_partkey) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; SELECT * FROM ( SELECT - l_orderkey, avg(DISTINCT l_partkey) + l_linenumber, avg(DISTINCT l_partkey) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; @@ -297,18 +297,18 @@ SELECT * SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT lineitem_hash) + l_linenumber, count(DISTINCT lineitem_hash) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT lineitem_hash.*) + l_linenumber, count(DISTINCT lineitem_hash.*) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; diff --git a/src/test/regress/input/multi_subquery.source b/src/test/regress/input/multi_subquery.source index 9d84e6a74..94fb2c806 100644 --- a/src/test/regress/input/multi_subquery.source +++ b/src/test/regress/input/multi_subquery.source @@ -46,7 +46,7 @@ SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range') SET citus.enable_router_execution TO 'false'; --- Check that we don't allow subquery pushdown in default settings. +-- Check that we allow subquery pushdown in default settings. SELECT avg(unit_price) @@ -62,8 +62,6 @@ FROM GROUP BY l_orderkey) AS unit_prices; -SET citus.subquery_pushdown to TRUE; - -- Check that we don't crash if there are not any shards. SELECT @@ -130,7 +128,8 @@ FROM UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003; --- If group by is not on partition column then we error out. +-- If group by is not on partition column then we error out from single table +-- repartition code path SELECT avg(order_count) @@ -227,6 +226,8 @@ FROM ( count(*) a FROM lineitem_subquery + GROUP BY + l_orderkey ) z; -- Check supported subquery types. @@ -337,7 +338,6 @@ CREATE TABLE subquery_pruning_varchar_test_table SELECT master_create_distributed_table('subquery_pruning_varchar_test_table', 'a', 'hash'); SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1); -SET citus.subquery_pushdown TO TRUE; SET client_min_messages TO DEBUG2; SELECT * FROM @@ -925,8 +925,6 @@ WHERE shardid = :new_shard_id; \. -SET citus.subquery_pushdown TO TRUE; - -- Simple join subquery pushdown SELECT avg(array_length(events, 1)) AS event_average diff --git a/src/test/regress/output/multi_complex_count_distinct.source b/src/test/regress/output/multi_complex_count_distinct.source index 707438761..59b4045ad 100644 --- a/src/test/regress/output/multi_complex_count_distinct.source +++ b/src/test/regress/output/multi_complex_count_distinct.source @@ -241,9 +241,9 @@ SELECT * SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT 1) + l_linenumber, count(DISTINCT 1) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; ERROR: cannot compute aggregate (distinct) @@ -253,9 +253,9 @@ HINT: You can load the hll extension from contrib packages and enable distinct SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT (random() * 5)::int) + l_linenumber, count(DISTINCT (random() * 5)::int) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; ERROR: cannot compute aggregate (distinct) @@ -464,9 +464,9 @@ SELECT * SELECT * FROM ( SELECT - l_orderkey, sum(DISTINCT l_partkey) + l_linenumber, sum(DISTINCT l_partkey) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; ERROR: cannot compute aggregate (distinct) @@ -474,9 +474,9 @@ DETAIL: Only count(distinct) aggregate is supported in subqueries SELECT * FROM ( SELECT - l_orderkey, avg(DISTINCT l_partkey) + l_linenumber, avg(DISTINCT l_partkey) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; ERROR: cannot compute aggregate (distinct) @@ -486,9 +486,9 @@ DETAIL: Only count(distinct) aggregate is supported in subqueries SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT lineitem_hash) + l_linenumber, count(DISTINCT lineitem_hash) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; ERROR: cannot compute count (distinct) @@ -496,9 +496,9 @@ DETAIL: Non-column references are not supported yet SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT lineitem_hash.*) + l_linenumber, count(DISTINCT lineitem_hash.*) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; ERROR: cannot compute count (distinct) diff --git a/src/test/regress/output/multi_subquery.source b/src/test/regress/output/multi_subquery.source index e2bff0987..7b8c2a56b 100644 --- a/src/test/regress/output/multi_subquery.source +++ b/src/test/regress/output/multi_subquery.source @@ -53,7 +53,7 @@ SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range') (1 row) SET citus.enable_router_execution TO 'false'; --- Check that we don't allow subquery pushdown in default settings. +-- Check that we allow subquery pushdown in default settings. SELECT avg(unit_price) FROM @@ -67,9 +67,11 @@ FROM l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; -ERROR: cannot perform distributed planning on this query -DETAIL: Join in subqueries is not supported yet -SET citus.subquery_pushdown to TRUE; + avg +----- + +(1 row) + -- Check that we don't crash if there are not any shards. SELECT avg(unit_price) @@ -129,7 +131,8 @@ ERROR: cannot push down this subquery DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning -- Update metadata in order to make all shards equal. UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003; --- If group by is not on partition column then we error out. +-- If group by is not on partition column then we error out from single table +-- repartition code path SELECT avg(order_count) FROM @@ -140,8 +143,8 @@ FROM lineitem_subquery GROUP BY l_suppkey) AS order_counts; -ERROR: cannot push down this subquery -DETAIL: Group by list without partition column is currently unsupported +ERROR: cannot use real time executor with repartition jobs +HINT: Set citus.task_executor_type to "task-tracker". -- Check that we error out if join is not on partition columns. SELECT avg(unit_price) @@ -223,6 +226,8 @@ FROM ( count(*) a FROM lineitem_subquery + GROUP BY + l_orderkey ) z; ERROR: cannot push down this subquery DETAIL: distinct in the outermost query is unsupported @@ -357,7 +362,6 @@ SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1); (1 row) -SET citus.subquery_pushdown TO TRUE; SET client_min_messages TO DEBUG2; SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) @@ -884,7 +888,6 @@ SELECT master_create_empty_shard('users') AS new_shard_id UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' WHERE shardid = :new_shard_id; \COPY users FROM STDIN WITH CSV -SET citus.subquery_pushdown TO TRUE; -- Simple join subquery pushdown SELECT avg(array_length(events, 1)) AS event_average diff --git a/src/test/regress/output/multi_subquery_0.source b/src/test/regress/output/multi_subquery_0.source index 0f719f196..4c27449f6 100644 --- a/src/test/regress/output/multi_subquery_0.source +++ b/src/test/regress/output/multi_subquery_0.source @@ -53,7 +53,7 @@ SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range') (1 row) SET citus.enable_router_execution TO 'false'; --- Check that we don't allow subquery pushdown in default settings. +-- Check that we allow subquery pushdown in default settings. SELECT avg(unit_price) FROM @@ -67,9 +67,11 @@ FROM l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; -ERROR: cannot perform distributed planning on this query -DETAIL: Join in subqueries is not supported yet -SET citus.subquery_pushdown to TRUE; + avg +----- + +(1 row) + -- Check that we don't crash if there are not any shards. SELECT avg(unit_price) @@ -129,7 +131,8 @@ ERROR: cannot push down this subquery DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning -- Update metadata in order to make all shards equal. UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003; --- If group by is not on partition column then we error out. +-- If group by is not on partition column then we error out from single table +-- repartition code path SELECT avg(order_count) FROM @@ -140,8 +143,8 @@ FROM lineitem_subquery GROUP BY l_suppkey) AS order_counts; -ERROR: cannot push down this subquery -DETAIL: Group by list without partition column is currently unsupported +ERROR: cannot use real time executor with repartition jobs +HINT: Set citus.task_executor_type to "task-tracker". -- Check that we error out if join is not on partition columns. SELECT avg(unit_price) @@ -223,6 +226,8 @@ FROM ( count(*) a FROM lineitem_subquery + GROUP BY + l_orderkey ) z; ERROR: cannot push down this subquery DETAIL: distinct in the outermost query is unsupported @@ -357,7 +362,6 @@ SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1); (1 row) -SET citus.subquery_pushdown TO TRUE; SET client_min_messages TO DEBUG2; SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) @@ -884,7 +888,6 @@ SELECT master_create_empty_shard('users') AS new_shard_id UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' WHERE shardid = :new_shard_id; \COPY users FROM STDIN WITH CSV -SET citus.subquery_pushdown TO TRUE; -- Simple join subquery pushdown SELECT avg(array_length(events, 1)) AS event_average diff --git a/src/test/regress/sql/multi_complex_expressions.sql b/src/test/regress/sql/multi_complex_expressions.sql index 236e8f4c6..4891b2103 100644 --- a/src/test/regress/sql/multi_complex_expressions.sql +++ b/src/test/regress/sql/multi_complex_expressions.sql @@ -161,13 +161,11 @@ SELECT count(*) FROM lineitem, orders WHERE l_orderkey + 1 = o_orderkey; -- Check that we can issue limit/offset queries -- OFFSET in subqueries are not supported --- Error in the planner when subquery pushdown is off -SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; -SET citus.subquery_pushdown TO true; +-- Error in the planner when single repartition subquery +SELECT * FROM (SELECT o_custkey FROM orders GROUP BY o_custkey ORDER BY o_custkey OFFSET 20) sq; -- Error in the optimizer when subquery pushdown is on SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; -SET citus.subquery_pushdown TO false; -- Simple LIMIT/OFFSET with ORDER BY SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20; diff --git a/src/test/regress/sql/multi_subquery_behavioral_analytics.sql b/src/test/regress/sql/multi_subquery_behavioral_analytics.sql index ee86bf1f1..14fe41709 100644 --- a/src/test/regress/sql/multi_subquery_behavioral_analytics.sql +++ b/src/test/regress/sql/multi_subquery_behavioral_analytics.sql @@ -8,7 +8,6 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1430000; -- ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1430000; -SET citus.subquery_pushdown TO TRUE; SET citus.enable_router_execution TO FALSE; ------------------------------------ @@ -1346,5 +1345,4 @@ FROM (SELECT WHERE b.user_id IS NULL GROUP BY a.user_id; -SET citus.subquery_pushdown TO FALSE; SET citus.enable_router_execution TO TRUE; diff --git a/src/test/regress/sql/multi_subquery_complex_queries.sql b/src/test/regress/sql/multi_subquery_complex_queries.sql index 018758b8e..412c73793 100644 --- a/src/test/regress/sql/multi_subquery_complex_queries.sql +++ b/src/test/regress/sql/multi_subquery_complex_queries.sql @@ -8,7 +8,6 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000; -SET citus.subquery_pushdown TO TRUE; SET citus.enable_router_execution TO FALSE; -- @@ -1870,5 +1869,4 @@ INNER JOIN GROUP BY types ORDER BY types; -SET citus.subquery_pushdown TO FALSE; SET citus.enable_router_execution TO TRUE; diff --git a/src/test/regress/sql/multi_subquery_union.sql b/src/test/regress/sql/multi_subquery_union.sql index 4daac4cce..7589128b4 100644 --- a/src/test/regress/sql/multi_subquery_union.sql +++ b/src/test/regress/sql/multi_subquery_union.sql @@ -6,7 +6,6 @@ -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; -SET citus.subquery_pushdown TO true; SET citus.enable_router_execution TO false; -- a very simple union query SELECT user_id, counter @@ -673,5 +672,4 @@ FROM GROUP BY types ORDER BY types; -SET citus.subquery_pushdown TO false; SET citus.enable_router_execution TO true; diff --git a/src/test/regress/sql/multi_view.sql b/src/test/regress/sql/multi_view.sql index 164e5a48f..af7294334 100644 --- a/src/test/regress/sql/multi_view.sql +++ b/src/test/regress/sql/multi_view.sql @@ -123,17 +123,21 @@ SET citus.task_executor_type to DEFAULT; CREATE VIEW lineitems_by_shipping_method AS SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1; --- following will fail due to non-flattening of subquery due to GROUP BY +-- following will fail due to non GROUP BY of partition key SELECT * FROM lineitems_by_shipping_method; -- create a view with group by on partition column CREATE VIEW lineitems_by_orderkey AS - SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1; + SELECT + l_orderkey, count(*) + FROM + lineitem_hash_part + GROUP BY 1; --- this will also fail due to same reason -SELECT * FROM lineitems_by_orderkey; +-- this should work since we're able to push down this query +SELECT * FROM lineitems_by_orderkey ORDER BY 2 DESC, 1 ASC LIMIT 10; --- however it would work if it is made router plannable +-- it would also work since it is made router plannable SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100; DROP TABLE temp_lineitem CASCADE;