diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 184a76481..20945f439 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -48,7 +48,6 @@ #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/rel.h" -#include "utils/relcache.h" #include "utils/syscache.h" #include "utils/tqual.h" @@ -148,28 +147,6 @@ static bool TablePartitioningSupportsDistinct(List *tableNodeList, Var *distinctColumn); static bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column); -/* Local functions forward declarations for subquery pushdown checks */ -static void ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode, - PlannerRestrictionContext * - plannerRestrictionContext, - Query *originalQuery); -static void ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit); -static void ErrorIfUnsupportedSetOperation(Query *subqueryTree, bool outerQueryHasLimit); -static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationList); -static void ErrorIfUnsupportedTableCombination(Query *queryTree); -static bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); -static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query); -static bool FullCompositeFieldList(List *compositeFieldList); -static void ErrorIfUnsupportedShardDistribution(Query *query); -static List * RelationIdList(Query *query); -static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); -static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, - ShardInterval *firstInterval, - ShardInterval *secondInterval); -static void ErrorIfUnsupportedFilters(Query *subquery); -static bool EqualOpExpressionLists(List *firstOpExpressionList, - List *secondOpExpressionList); - /* Local functions forward declarations for limit clauses */ static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode); static List * WorkerSortClauseList(MultiExtendedOp *originalOpNode); @@ -192,16 +169,9 @@ static bool HasOrderByHllType(List *sortClauseList, List *targetList); * Third, the function pulls up the collect operators in the tree. Fourth, the * function finds the extended operator node, and splits this node into master * and worker extended operator nodes. - * - * We also pass plannerRestrictionContext and originalQuery to the optimizer. - * These are primarily used to decide whether the subquery is safe to pushdown. - * If not, it helps to produce meaningful error messages for subquery - * pushdown planning. */ void -MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan, - PlannerRestrictionContext *plannerRestrictionContext, - Query *originalQuery) +MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) { bool hasOrderByHllType = false; List *selectNodeList = NIL; @@ -220,10 +190,6 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan, /* check that we can optimize aggregates in the plan */ ErrorIfContainsUnsupportedAggregate(logicalPlanNode); - /* check that we can pushdown subquery in the plan */ - ErrorIfContainsUnsupportedSubquery(logicalPlanNode, plannerRestrictionContext, - originalQuery); - /* * If a select node exists, we use the idempower property to split the node * into two nodes that contain And and Or clauses. If both And and Or nodes @@ -2809,81 +2775,6 @@ GroupedByColumn(List *groupClauseList, List *targetList, Var *column) } -/* - * ErrorIfContainsUnsupportedSubquery extracts subquery multi table from the - * logical plan using plannerRestrictionContext and the original query. It uses - * some helper functions to check if we can push down subquery to worker nodes. - * These helper functions error out if we cannot push down the subquery. - */ -static void -ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode, - PlannerRestrictionContext *plannerRestrictionContext, - Query *originalQuery) -{ - Query *subquery = NULL; - List *extendedOpNodeList = NIL; - MultiTable *multiTable = NULL; - MultiExtendedOp *extendedOpNode = NULL; - bool outerQueryHasLimit = false; - RelationRestrictionContext *relationRestrictionContext = - plannerRestrictionContext->relationRestrictionContext; - - /* check if logical plan includes a subquery */ - List *subqueryMultiTableList = SubqueryMultiTableList(logicalPlanNode); - if (subqueryMultiTableList == NIL) - { - return; - } - - /* currently in the planner we only allow one subquery in from-clause*/ - Assert(list_length(subqueryMultiTableList) == 1); - - /* - * We're checking two things here: - * (i) If the query contains a top level union, ensure that all leaves - * return the partition key at the same position - * (ii) Else, check whether all relations joined on the partition key or not - */ - if (ContainsUnionSubquery(originalQuery)) - { - if (!SafeToPushdownUnionSubquery(relationRestrictionContext)) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot pushdown the subquery since all leaves of " - "the UNION does not include partition key at the " - "same position"), - errdetail("Each leaf query of the UNION should return " - "partition key at the same position on its " - "target list."))); - } - } - else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext)) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot pushdown the subquery since all relations are not " - "joined using distribution keys"), - errdetail("Each relation should be joined with at least " - "one another relation using distribution keys and " - "equality operator."))); - } - - multiTable = (MultiTable *) linitial(subqueryMultiTableList); - subquery = multiTable->subquery; - - extendedOpNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp); - extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList); - - if (extendedOpNode->limitCount) - { - outerQueryHasLimit = true; - } - - ErrorIfCannotPushdownSubquery(subquery, outerQueryHasLimit); - ErrorIfUnsupportedShardDistribution(subquery); - ErrorIfUnsupportedFilters(subquery); -} - - /* * SubqueryMultiTableList extracts multi tables in the given logical plan tree * and returns subquery multi tables in a new list. @@ -2910,290 +2801,6 @@ SubqueryMultiTableList(MultiNode *multiNode) } -/* - * ErrorIfCannotPushdownSubquery recursively checks if we can push down the given - * subquery to worker nodes. If we cannot push down the subquery, this function - * errors out. - * - * We can push down a subquery if it follows rules below. We support nested queries - * as long as they follow the same rules, and we recurse to validate each subquery - * for this given query. - * a. If there is an aggregate, it must be grouped on partition column. - * b. If there is a join, it must be between two regular tables or two subqueries. - * We don't support join between a regular table and a subquery. And columns on - * the join condition must be partition columns. - * c. If there is a distinct clause, it must be on the partition column. - * - * This function is very similar to ErrorIfQueryNotSupported() in logical - * planner, but we don't reuse it, because differently for subqueries we support - * a subset of distinct, union and left joins. - * - * Note that this list of checks is not exhaustive, there can be some cases - * which we let subquery to run but returned results would be wrong. Such as if - * a subquery has a group by on another subquery which includes order by with - * limit, we let this query to run, but results could be wrong depending on the - * features of underlying tables. - */ -static void -ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit) -{ - bool preconditionsSatisfied = true; - char *errorDetail = NULL; - List *subqueryEntryList = NIL; - ListCell *rangeTableEntryCell = NULL; - - ErrorIfUnsupportedTableCombination(subqueryTree); - - if (subqueryTree->hasSubLinks) - { - preconditionsSatisfied = false; - errorDetail = "Subqueries other than from-clause subqueries are unsupported"; - } - - if (subqueryTree->rtable == NIL) - { - preconditionsSatisfied = false; - errorDetail = "Subqueries without relations are unsupported"; - } - - if (subqueryTree->hasWindowFuncs) - { - preconditionsSatisfied = false; - errorDetail = "Window functions are currently unsupported"; - } - - if (subqueryTree->limitOffset) - { - preconditionsSatisfied = false; - errorDetail = "Offset clause is currently unsupported"; - } - - if (subqueryTree->limitCount && !outerQueryHasLimit) - { - preconditionsSatisfied = false; - errorDetail = "Limit in subquery without limit in the outer query is unsupported"; - } - - if (subqueryTree->setOperations) - { - ErrorIfUnsupportedSetOperation(subqueryTree, outerQueryHasLimit); - } - - if (subqueryTree->hasRecursive) - { - preconditionsSatisfied = false; - errorDetail = "Recursive queries are currently unsupported"; - } - - if (subqueryTree->cteList) - { - preconditionsSatisfied = false; - errorDetail = "Common Table Expressions are currently unsupported"; - } - - if (subqueryTree->hasForUpdate) - { - preconditionsSatisfied = false; - errorDetail = "For Update/Share commands are currently unsupported"; - } - - /* group clause list must include partition column */ - if (subqueryTree->groupClause) - { - List *groupClauseList = subqueryTree->groupClause; - List *targetEntryList = subqueryTree->targetList; - List *groupTargetEntryList = GroupTargetEntryList(groupClauseList, - targetEntryList); - bool groupOnPartitionColumn = TargetListOnPartitionColumn(subqueryTree, - groupTargetEntryList); - if (!groupOnPartitionColumn) - { - preconditionsSatisfied = false; - errorDetail = "Group by list without partition column is currently " - "unsupported"; - } - } - - /* we don't support aggregates without group by */ - if (subqueryTree->hasAggs && (subqueryTree->groupClause == NULL)) - { - preconditionsSatisfied = false; - errorDetail = "Aggregates without group by are currently unsupported"; - } - - /* having clause without group by on partition column is not supported */ - if (subqueryTree->havingQual && (subqueryTree->groupClause == NULL)) - { - preconditionsSatisfied = false; - errorDetail = "Having qual without group by on partition column is " - "currently unsupported"; - } - - /* distinct clause list must include partition column */ - if (subqueryTree->distinctClause) - { - List *distinctClauseList = subqueryTree->distinctClause; - List *targetEntryList = subqueryTree->targetList; - List *distinctTargetEntryList = GroupTargetEntryList(distinctClauseList, - targetEntryList); - bool distinctOnPartitionColumn = - TargetListOnPartitionColumn(subqueryTree, distinctTargetEntryList); - if (!distinctOnPartitionColumn) - { - preconditionsSatisfied = false; - errorDetail = "Distinct on columns without partition column is " - "currently unsupported"; - } - } - - /* finally check and error out if not satisfied */ - if (!preconditionsSatisfied) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("%s", errorDetail))); - } - - /* recursively do same check for subqueries of this query */ - subqueryEntryList = SubqueryEntryList(subqueryTree); - foreach(rangeTableEntryCell, subqueryEntryList) - { - RangeTblEntry *rangeTableEntry = - (RangeTblEntry *) lfirst(rangeTableEntryCell); - - Query *innerSubquery = rangeTableEntry->subquery; - ErrorIfCannotPushdownSubquery(innerSubquery, outerQueryHasLimit); - } -} - - -/* - * ErrorIfUnsupportedSetOperation is a helper function for ErrorIfCannotPushdownSubquery(). - * It basically iterates over the subqueries that reside under the given set operations. - * - * The function also errors out for set operations INTERSECT and EXCEPT. - */ -static void -ErrorIfUnsupportedSetOperation(Query *subqueryTree, bool outerQueryHasLimit) -{ - List *rangeTableList = subqueryTree->rtable; - List *rangeTableIndexList = NIL; - ListCell *rangeTableIndexCell = NULL; - List *setOperationStatementList = NIL; - ListCell *setOperationStatmentCell = NULL; - - ExtractSetOperationStatmentWalker((Node *) subqueryTree->setOperations, - &setOperationStatementList); - foreach(setOperationStatmentCell, setOperationStatementList) - { - SetOperationStmt *setOperation = - (SetOperationStmt *) lfirst(setOperationStatmentCell); - - if (setOperation->op != SETOP_UNION) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("Intersect and Except are currently unsupported"))); - } - } - - ExtractRangeTableIndexWalker((Node *) subqueryTree->setOperations, - &rangeTableIndexList); - foreach(rangeTableIndexCell, rangeTableIndexList) - { - int rangeTableIndex = lfirst_int(rangeTableIndexCell); - RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableIndex, rangeTableList); - - Assert(rangeTableEntry->rtekind == RTE_SUBQUERY); - - ErrorIfCannotPushdownSubquery(rangeTableEntry->subquery, outerQueryHasLimit); - } -} - - -/* - * ExtractSetOperationStatementWalker walks over a set operations statment, - * and finds all set operations in the tree. - */ -static bool -ExtractSetOperationStatmentWalker(Node *node, List **setOperationList) -{ - bool walkerResult = false; - if (node == NULL) - { - return false; - } - - if (IsA(node, SetOperationStmt)) - { - SetOperationStmt *setOperation = (SetOperationStmt *) node; - - (*setOperationList) = lappend(*setOperationList, setOperation); - } - - walkerResult = expression_tree_walker(node, ExtractSetOperationStatmentWalker, - setOperationList); - - return walkerResult; -} - - -/* - * ErrorIfUnsupportedTableCombination checks if the given query tree contains any - * unsupported range table combinations. For this, the function walks over all - * range tables in the join tree, and checks if they correspond to simple relations - * or subqueries. - */ -static void -ErrorIfUnsupportedTableCombination(Query *queryTree) -{ - List *rangeTableList = queryTree->rtable; - List *joinTreeTableIndexList = NIL; - ListCell *joinTreeTableIndexCell = NULL; - bool unsupporteTableCombination = false; - char *errorDetail = NULL; - - /* - * Extract all range table indexes from the join tree. Note that sub-queries - * that get pulled up by PostgreSQL don't appear in this join tree. - */ - ExtractRangeTableIndexWalker((Node *) queryTree->jointree, &joinTreeTableIndexList); - foreach(joinTreeTableIndexCell, joinTreeTableIndexList) - { - /* - * Join tree's range table index starts from 1 in the query tree. But, - * list indexes start from 0. - */ - int joinTreeTableIndex = lfirst_int(joinTreeTableIndexCell); - int rangeTableListIndex = joinTreeTableIndex - 1; - - RangeTblEntry *rangeTableEntry = - (RangeTblEntry *) list_nth(rangeTableList, rangeTableListIndex); - - /* - * Check if the range table in the join tree is a simple relation or a - * subquery. - */ - if (rangeTableEntry->rtekind != RTE_RELATION && - rangeTableEntry->rtekind != RTE_SUBQUERY) - { - unsupporteTableCombination = true; - errorDetail = "Table expressions other than simple relations and " - "subqueries are currently unsupported"; - break; - } - } - - /* finally check and error out if not satisfied */ - if (unsupporteTableCombination) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("%s", errorDetail))); - } -} - - /* * GroupTargetEntryList walks over group clauses in the given list, finds * matching target entries and return them in a new list. @@ -3216,53 +2823,6 @@ GroupTargetEntryList(List *groupClauseList, List *targetEntryList) } -/* - * TargetListOnPartitionColumn checks if at least one target list entry is on - * partition column. - */ -static bool -TargetListOnPartitionColumn(Query *query, List *targetEntryList) -{ - bool targetListOnPartitionColumn = false; - List *compositeFieldList = NIL; - - ListCell *targetEntryCell = NULL; - foreach(targetEntryCell, targetEntryList) - { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - Expr *targetExpression = targetEntry->expr; - - bool isPartitionColumn = IsPartitionColumn(targetExpression, query); - if (isPartitionColumn) - { - FieldSelect *compositeField = CompositeFieldRecursive(targetExpression, - query); - if (compositeField) - { - compositeFieldList = lappend(compositeFieldList, compositeField); - } - else - { - targetListOnPartitionColumn = true; - break; - } - } - } - - /* check composite fields */ - if (!targetListOnPartitionColumn) - { - bool fullCompositeFieldList = FullCompositeFieldList(compositeFieldList); - if (fullCompositeFieldList) - { - targetListOnPartitionColumn = true; - } - } - - return targetListOnPartitionColumn; -} - - /* * IsPartitionColumn returns true if the given column is a partition column. * The function uses FindReferencedTableColumn to find the original relation @@ -3416,265 +2976,9 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query * } -/* - * CompositeFieldRecursive recursively finds composite field in the query tree - * referred by given expression. If expression does not refer to a composite - * field, then it returns NULL. - * - * If expression is a field select we directly return composite field. If it is - * a column is referenced from a subquery, then we recursively check that subquery - * until we reach the source of that column, and find composite field. If this - * column is referenced from join range table entry, then we resolve which join - * column it refers and recursively use this column with the same query. - */ -static FieldSelect * -CompositeFieldRecursive(Expr *expression, Query *query) -{ - FieldSelect *compositeField = NULL; - List *rangetableList = query->rtable; - Index rangeTableEntryIndex = 0; - RangeTblEntry *rangeTableEntry = NULL; - Var *candidateColumn = NULL; - - if (IsA(expression, FieldSelect)) - { - compositeField = (FieldSelect *) expression; - return compositeField; - } - - if (IsA(expression, Var)) - { - candidateColumn = (Var *) expression; - } - else - { - return NULL; - } - - rangeTableEntryIndex = candidateColumn->varno - 1; - rangeTableEntry = list_nth(rangetableList, rangeTableEntryIndex); - - if (rangeTableEntry->rtekind == RTE_SUBQUERY) - { - Query *subquery = rangeTableEntry->subquery; - List *targetEntryList = subquery->targetList; - AttrNumber targetEntryIndex = candidateColumn->varattno - 1; - TargetEntry *subqueryTargetEntry = list_nth(targetEntryList, targetEntryIndex); - - Expr *subqueryExpression = subqueryTargetEntry->expr; - compositeField = CompositeFieldRecursive(subqueryExpression, subquery); - } - else if (rangeTableEntry->rtekind == RTE_JOIN) - { - List *joinColumnList = rangeTableEntry->joinaliasvars; - AttrNumber joinColumnIndex = candidateColumn->varattno - 1; - Expr *joinColumn = list_nth(joinColumnList, joinColumnIndex); - - compositeField = CompositeFieldRecursive(joinColumn, query); - } - - return compositeField; -} - - -/* - * FullCompositeFieldList gets a composite field list, and checks if all fields - * of composite type are used in the list. - */ -static bool -FullCompositeFieldList(List *compositeFieldList) -{ - bool fullCompositeFieldList = true; - bool *compositeFieldArray = NULL; - uint32 compositeFieldCount = 0; - uint32 fieldIndex = 0; - - ListCell *fieldSelectCell = NULL; - foreach(fieldSelectCell, compositeFieldList) - { - FieldSelect *fieldSelect = (FieldSelect *) lfirst(fieldSelectCell); - uint32 compositeFieldIndex = 0; - - Expr *fieldExpression = fieldSelect->arg; - if (!IsA(fieldExpression, Var)) - { - continue; - } - - if (compositeFieldArray == NULL) - { - uint32 index = 0; - Var *compositeColumn = (Var *) fieldExpression; - Oid compositeTypeId = compositeColumn->vartype; - Oid compositeRelationId = get_typ_typrelid(compositeTypeId); - - /* get composite type attribute count */ - Relation relation = relation_open(compositeRelationId, AccessShareLock); - compositeFieldCount = relation->rd_att->natts; - compositeFieldArray = palloc0(compositeFieldCount * sizeof(bool)); - relation_close(relation, AccessShareLock); - - for (index = 0; index < compositeFieldCount; index++) - { - compositeFieldArray[index] = false; - } - } - - compositeFieldIndex = fieldSelect->fieldnum - 1; - compositeFieldArray[compositeFieldIndex] = true; - } - - for (fieldIndex = 0; fieldIndex < compositeFieldCount; fieldIndex++) - { - if (!compositeFieldArray[fieldIndex]) - { - fullCompositeFieldList = false; - } - } - - if (compositeFieldCount == 0) - { - fullCompositeFieldList = false; - } - - return fullCompositeFieldList; -} - - -/* - * ErrorIfUnsupportedShardDistribution gets list of relations in the given query - * and checks if two conditions below hold for them, otherwise it errors out. - * a. Every relation is distributed by range or hash. This means shards are - * disjoint based on the partition column. - * b. All relations have 1-to-1 shard partitioning between them. This means - * shard count for every relation is same and for every shard in a relation - * there is exactly one shard in other relations with same min/max values. - */ -static void -ErrorIfUnsupportedShardDistribution(Query *query) -{ - Oid firstTableRelationId = InvalidOid; - List *relationIdList = RelationIdList(query); - ListCell *relationIdCell = NULL; - uint32 relationIndex = 0; - uint32 rangeDistributedRelationCount = 0; - uint32 hashDistributedRelationCount = 0; - - foreach(relationIdCell, relationIdList) - { - Oid relationId = lfirst_oid(relationIdCell); - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_RANGE) - { - rangeDistributedRelationCount++; - } - else if (partitionMethod == DISTRIBUTE_BY_HASH) - { - hashDistributedRelationCount++; - } - else - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("Currently range and hash partitioned " - "relations are supported"))); - } - } - - if ((rangeDistributedRelationCount > 0) && (hashDistributedRelationCount > 0)) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("A query including both range and hash " - "partitioned relations are unsupported"))); - } - - foreach(relationIdCell, relationIdList) - { - Oid relationId = lfirst_oid(relationIdCell); - bool coPartitionedTables = false; - Oid currentRelationId = relationId; - - /* get shard list of first relation and continue for the next relation */ - if (relationIndex == 0) - { - firstTableRelationId = relationId; - relationIndex++; - - continue; - } - - /* check if this table has 1-1 shard partitioning with first table */ - coPartitionedTables = CoPartitionedTables(firstTableRelationId, - currentRelationId); - if (!coPartitionedTables) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("Shards of relations in subquery need to " - "have 1-to-1 shard partitioning"))); - } - } -} - - -/* - * RelationIdList returns list of unique relation ids in query tree. - */ -static List * -RelationIdList(Query *query) -{ - List *rangeTableList = NIL; - List *tableEntryList = NIL; - List *relationIdList = NIL; - ListCell *tableEntryCell = NULL; - - ExtractRangeTableRelationWalker((Node *) query, &rangeTableList); - tableEntryList = TableEntryList(rangeTableList); - - foreach(tableEntryCell, tableEntryList) - { - TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell); - Oid relationId = tableEntry->relationId; - - relationIdList = list_append_unique_oid(relationIdList, relationId); - } - - return relationIdList; -} - - /* * CoPartitionedTables checks if given two distributed tables have 1-to-1 * shard partitioning. - */ -static bool -CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) -{ - bool coPartitionedTables = true; - uint32 intervalIndex = 0; - DistTableCacheEntry *firstTableCache = DistributedTableCacheEntry(firstRelationId); - DistTableCacheEntry *secondTableCache = DistributedTableCacheEntry(secondRelationId); - ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray; - ShardInterval **sortedSecondIntervalArray = - secondTableCache->sortedShardIntervalArray; - uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength; - uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength; - FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction; - - if (firstListShardCount != secondListShardCount) - { - return false; - } - - /* if there are not any shards just return true */ - if (firstListShardCount == 0) - { - return true; - } - - Assert(comparisonFunction != NULL); - /* * Check if the tables have the same colocation ID - if so, we know * they're colocated. @@ -3691,126 +2995,6 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) * interval minimum values. Then it compares every shard interval in order * and if any pair of shard intervals are not equal it returns false. */ - for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++) - { - ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex]; - ShardInterval *secondInterval = sortedSecondIntervalArray[intervalIndex]; - - bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction, - firstInterval, - secondInterval); - if (!shardIntervalsEqual) - { - coPartitionedTables = false; - break; - } - } - - return coPartitionedTables; -} - - -/* - * ShardIntervalsEqual checks if given shard intervals have equal min/max values. - */ -static bool -ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval, - ShardInterval *secondInterval) -{ - bool shardIntervalsEqual = false; - Datum firstMin = 0; - Datum firstMax = 0; - Datum secondMin = 0; - Datum secondMax = 0; - - firstMin = firstInterval->minValue; - firstMax = firstInterval->maxValue; - secondMin = secondInterval->minValue; - secondMax = secondInterval->maxValue; - - if (firstInterval->minValueExists && firstInterval->maxValueExists && - secondInterval->minValueExists && secondInterval->maxValueExists) - { - Datum minDatum = CompareCall2(comparisonFunction, firstMin, secondMin); - Datum maxDatum = CompareCall2(comparisonFunction, firstMax, secondMax); - int firstComparison = DatumGetInt32(minDatum); - int secondComparison = DatumGetInt32(maxDatum); - - if (firstComparison == 0 && secondComparison == 0) - { - shardIntervalsEqual = true; - } - } - - return shardIntervalsEqual; -} - - -/* - * ErrorIfUnsupportedFilters checks if all leaf queries in the given query have - * same filter on the partition column. Note that if there are queries without - * any filter on the partition column, they don't break this prerequisite. - */ -static void -ErrorIfUnsupportedFilters(Query *subquery) -{ - List *queryList = NIL; - ListCell *queryCell = NULL; - List *subqueryOpExpressionList = NIL; - List *relationIdList = RelationIdList(subquery); - - /* - * Get relation id of any relation in the subquery and create partiton column - * for this relation. We will use this column to replace columns on operator - * expressions on different tables. Then we compare these operator expressions - * to see if they consist of same operator and constant value. - */ - Oid relationId = linitial_oid(relationIdList); - Var *partitionColumn = PartitionColumn(relationId, 0); - - ExtractQueryWalker((Node *) subquery, &queryList); - foreach(queryCell, queryList) - { - Query *query = (Query *) lfirst(queryCell); - List *opExpressionList = NIL; - List *newOpExpressionList = NIL; - - bool leafQuery = LeafQuery(query); - if (!leafQuery) - { - continue; - } - - opExpressionList = PartitionColumnOpExpressionList(query); - if (opExpressionList == NIL) - { - continue; - } - - newOpExpressionList = ReplaceColumnsInOpExpressionList(opExpressionList, - partitionColumn); - - if (subqueryOpExpressionList == NIL) - { - subqueryOpExpressionList = newOpExpressionList; - } - else - { - bool equalOpExpressionLists = EqualOpExpressionLists(subqueryOpExpressionList, - newOpExpressionList); - if (!equalOpExpressionLists) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("Currently all leaf queries need to " - "have same filters on partition column"))); - } - } - } -} - - -/* * ExtractQueryWalker walks over a query, and finds all queries in the query * tree and returns these queries. */ @@ -4008,51 +3192,6 @@ ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn) } -/* - * EqualOpExpressionLists checks if given two operator expression lists are - * equal. - */ -static bool -EqualOpExpressionLists(List *firstOpExpressionList, List *secondOpExpressionList) -{ - bool equalOpExpressionLists = false; - ListCell *firstOpExpressionCell = NULL; - uint32 equalOpExpressionCount = 0; - uint32 firstOpExpressionCount = list_length(firstOpExpressionList); - uint32 secondOpExpressionCount = list_length(secondOpExpressionList); - - if (firstOpExpressionCount != secondOpExpressionCount) - { - return false; - } - - foreach(firstOpExpressionCell, firstOpExpressionList) - { - OpExpr *firstOpExpression = (OpExpr *) lfirst(firstOpExpressionCell); - ListCell *secondOpExpressionCell = NULL; - - foreach(secondOpExpressionCell, secondOpExpressionList) - { - OpExpr *secondOpExpression = (OpExpr *) lfirst(secondOpExpressionCell); - bool equalExpressions = equal(firstOpExpression, secondOpExpression); - - if (equalExpressions) - { - equalOpExpressionCount++; - continue; - } - } - } - - if (equalOpExpressionCount == firstOpExpressionCount) - { - equalOpExpressionLists = true; - } - - return equalOpExpressionLists; -} - - /* * WorkerLimitCount checks if the given extended node contains a limit node, and * if that node can be pushed down. For this, the function checks if this limit diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 9e590d7e6..a25fcb149 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/heapam.h" #include "access/nbtree.h" #include "catalog/pg_am.h" #include "catalog/pg_class.h" @@ -23,6 +24,7 @@ #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" +#include "distributed/relation_restriction_equivalence.h" #include "distributed/worker_protocol.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -34,6 +36,8 @@ #include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "utils/rel.h" +#include "utils/relcache.h" /* Config variable managed via guc.c */ @@ -56,11 +60,30 @@ typedef MultiNode *(*RuleApplyFunction) (MultiNode *leftNode, MultiNode *rightNo static RuleApplyFunction RuleApplyFunctionArray[JOIN_RULE_LAST] = { 0 }; /* join rules */ /* Local functions forward declarations */ +static bool SingleRelationRepartitionSubquery(Query *queryTree); +static DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown(Query * + originalQuery, + PlannerRestrictionContext + * + plannerRestrictionContext); +static DeferredErrorMessage * DeferErrorIfUnsupportedFilters(Query *subquery); +static bool EqualOpExpressionLists(List *firstOpExpressionList, + List *secondOpExpressionList); +static DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, + bool outerQueryHasLimit); +static DeferredErrorMessage * DeferErrorIfUnsupportedUnionQuery(Query *queryTree, + bool outerQueryHasLimit); +static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationList); +static DeferredErrorMessage * DeferErrorIfUnsupportedTableCombination(Query *queryTree); +static bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); +static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query); +static bool FullCompositeFieldList(List *compositeFieldList); static MultiNode * MultiPlanTree(Query *queryTree); static void ErrorIfQueryNotSupported(Query *queryTree); static bool HasUnsupportedJoinWalker(Node *node, void *context); static bool ErrorHintRequired(const char *errorHint, Query *queryTree); -static void ErrorIfSubqueryNotSupported(Query *subqueryTree); +static DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryRepartition(Query * + subqueryTree); static bool HasTablesample(Query *queryTree); static bool HasOuterJoin(Query *queryTree); static bool HasOuterJoinWalker(Node *node, void *maxJoinLevel); @@ -104,9 +127,12 @@ static MultiNode * ApplyCartesianProduct(MultiNode *leftNode, MultiNode *rightNo * Local functions forward declarations for subquery pushdown. Note that these * functions will be removed with upcoming subqery changes. */ +static MultiNode * MultiSubqueryPlanTree(Query *originalQuery, + Query *queryTree, + PlannerRestrictionContext * + plannerRestrictionContext); static MultiNode * SubqueryPushdownMultiPlanTree(Query *queryTree); -static void ErrorIfSubqueryJoin(Query *queryTree); static List * CreateSubqueryTargetEntryList(List *columnList); static void UpdateVarMappingsForExtendedOpNode(List *columnList, List *subqueryTargetEntryList); @@ -118,9 +144,15 @@ static MultiTable * MultiSubqueryPushdownTable(Query *subquery); * query tree yield by the standard planner. It uses helper functions to create logical * plan and adds a root node to top of it. The original query is only used for subquery * pushdown planning. + * + * We also pass queryTree and plannerRestrictionContext to the planner. They + * are primarily used to decide whether the subquery is safe to pushdown. + * If not, it helps to produce meaningful error messages for subquery + * pushdown planning. */ MultiTreeRoot * -MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree) +MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree, + PlannerRestrictionContext *plannerRestrictionContext) { MultiNode *multiQueryNode = NULL; MultiTreeRoot *rootNode = NULL; @@ -134,15 +166,8 @@ MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree) subqueryEntryList = SubqueryEntryList(queryTree); if (subqueryEntryList != NIL) { - if (SubqueryPushdown) - { - multiQueryNode = SubqueryPushdownMultiPlanTree(originalQuery); - } - else - { - ErrorIfSubqueryJoin(queryTree); - multiQueryNode = MultiPlanTree(queryTree); - } + multiQueryNode = MultiSubqueryPlanTree(originalQuery, queryTree, + plannerRestrictionContext); } else { @@ -157,6 +182,826 @@ MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree) } +/* + * MultiSubqueryPlanTree gets the query objects and returns logical plan + * for subqueries. + * + * We currently have two different code paths for creating logic plan for subqueries: + * (i) subquery pushdown + * (ii) single relation repartition subquery + * + * In order to create the logical plan, we follow the algorithm below: + * - If subquery pushdown planner can plan the query + * - We're done, we create the multi plan tree and return + * - Else + * - If the query is not eligible for single table repartition subquery planning + * - Throw the error that the subquery pushdown planner generated + * - If it is eligible for single table repartition subquery planning + * - Check for the errors for single table repartition subquery planning + * - If no errors found, we're done. Create the multi plan and return + * - If found errors, throw it + */ +static MultiNode * +MultiSubqueryPlanTree(Query *originalQuery, Query *queryTree, + PlannerRestrictionContext *plannerRestrictionContext) +{ + MultiNode *multiQueryNode = NULL; + DeferredErrorMessage *subqueryPushdownError = NULL; + + /* + * This is a generic error check that applies to both subquery pushdown + * and single table repartition subquery. + */ + ErrorIfQueryNotSupported(originalQuery); + + /* + * In principle, we're first trying subquery pushdown planner. If it fails + * to create a logical plan, continue with trying the single table + * repartition subquery planning. + */ + subqueryPushdownError = DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, + plannerRestrictionContext); + if (!subqueryPushdownError) + { + multiQueryNode = SubqueryPushdownMultiPlanTree(originalQuery); + } + else if (subqueryPushdownError) + { + bool singleRelationRepartitionSubquery = false; + RangeTblEntry *subqueryRangeTableEntry = NULL; + Query *subqueryTree = NULL; + DeferredErrorMessage *repartitionQueryError = NULL; + List *subqueryEntryList = NULL; + + /* + * If not eligible for single relation repartition query, we should raise + * subquery pushdown error. + */ + singleRelationRepartitionSubquery = + SingleRelationRepartitionSubquery(originalQuery); + if (!singleRelationRepartitionSubquery) + { + RaiseDeferredErrorInternal(subqueryPushdownError, ERROR); + } + + subqueryEntryList = SubqueryEntryList(queryTree); + subqueryRangeTableEntry = (RangeTblEntry *) linitial(subqueryEntryList); + Assert(subqueryRangeTableEntry->rtekind == RTE_SUBQUERY); + + subqueryTree = subqueryRangeTableEntry->subquery; + + repartitionQueryError = DeferErrorIfUnsupportedSubqueryRepartition(subqueryTree); + if (repartitionQueryError) + { + RaiseDeferredErrorInternal(repartitionQueryError, ERROR); + } + + /* all checks has passed, safe to create the multi plan */ + multiQueryNode = MultiPlanTree(queryTree); + } + + Assert(multiQueryNode != NULL); + + return multiQueryNode; +} + + +/* + * SingleRelationRepartitionSubquery returns true if it is eligible single + * repartition query planning in the sense that: + * - None of the levels of the subquery contains a join + * - Only a single RTE_RELATION exists, which means only a single table + * name is specified on the whole query + * - No sublinks exists in the subquery + * + * Note that the caller should still call DeferErrorIfUnsupportedSubqueryRepartition() + * to ensure that Citus supports the subquery. Also, this function is designed to run + * on the original query. + */ +static bool +SingleRelationRepartitionSubquery(Query *queryTree) +{ + List *rangeTableIndexList = NULL; + RangeTblEntry *rangeTableEntry = NULL; + List *rangeTableList = queryTree->rtable; + int rangeTableIndex = 0; + + /* we don't support subqueries in WHERE */ + if (queryTree->hasSubLinks) + { + return false; + } + + /* + * Don't allow joins and set operations. If join appears in the queryTree, the + * length would be greater than 1. If only set operations exists, the length + * would be 0. + */ + ExtractRangeTableIndexWalker((Node *) queryTree->jointree, + &rangeTableIndexList); + if (list_length(rangeTableIndexList) != 1) + { + return false; + } + + rangeTableIndex = linitial_int(rangeTableIndexList); + rangeTableEntry = rt_fetch(rangeTableIndex, rangeTableList); + if (rangeTableEntry->rtekind == RTE_RELATION) + { + return true; + } + else if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { + Query *subqueryTree = rangeTableEntry->subquery; + + return SingleRelationRepartitionSubquery(subqueryTree); + } + + return false; +} + + +/* + * DeferErrorIfContainsUnsupportedSubqueryPushdown iterates on the query's subquery + * entry list and uses helper functions to check if we can push down subquery + * to worker nodes. These helper functions returns a deferred error if we + * cannot push down the subquery. + */ +static DeferredErrorMessage * +DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery, + PlannerRestrictionContext * + plannerRestrictionContext) +{ + ListCell *rangeTableEntryCell = NULL; + List *subqueryEntryList = NIL; + bool outerQueryHasLimit = false; + DeferredErrorMessage *error = NULL; + RelationRestrictionContext *relationRestrictionContext = + plannerRestrictionContext->relationRestrictionContext; + + if (originalQuery->limitCount != NULL) + { + outerQueryHasLimit = true; + } + + /* + * We're checking two things here: + * (i) If the query contains a top level union, ensure that all leaves + * return the partition key at the same position + * (ii) Else, check whether all relations joined on the partition key or not + */ + if (ContainsUnionSubquery(originalQuery)) + { + if (!SafeToPushdownUnionSubquery(relationRestrictionContext)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot pushdown the subquery since all leaves of " + "the UNION does not include partition key at the " + "same position", + "Each leaf query of the UNION should return " + "partition key at the same position on its " + "target list.", NULL); + } + } + else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot pushdown the subquery since all relations are not " + "joined using distribution keys", + "Each relation should be joined with at least " + "one another relation using distribution keys and " + "equality operator.", NULL); + } + + subqueryEntryList = SubqueryEntryList(originalQuery); + foreach(rangeTableEntryCell, subqueryEntryList) + { + RangeTblEntry *rangeTableEntry = lfirst(rangeTableEntryCell); + Query *subquery = rangeTableEntry->subquery; + + error = DeferErrorIfCannotPushdownSubquery(subquery, outerQueryHasLimit); + if (error) + { + return error; + } + + error = DeferErrorIfUnsupportedFilters(subquery); + if (error) + { + return error; + } + } + + return NULL; +} + + +/* + * DeferErrorIfUnsupportedFilters checks if all leaf queries in the given query have + * same filter on the partition column. Note that if there are queries without + * any filter on the partition column, they don't break this prerequisite. + */ +static DeferredErrorMessage * +DeferErrorIfUnsupportedFilters(Query *subquery) +{ + List *queryList = NIL; + ListCell *queryCell = NULL; + List *subqueryOpExpressionList = NIL; + List *relationIdList = RelationIdList(subquery); + + /* + * Get relation id of any relation in the subquery and create partiton column + * for this relation. We will use this column to replace columns on operator + * expressions on different tables. Then we compare these operator expressions + * to see if they consist of same operator and constant value. + */ + Oid relationId = linitial_oid(relationIdList); + Var *partitionColumn = PartitionColumn(relationId, 0); + + ExtractQueryWalker((Node *) subquery, &queryList); + foreach(queryCell, queryList) + { + Query *query = (Query *) lfirst(queryCell); + List *opExpressionList = NIL; + List *newOpExpressionList = NIL; + + bool leafQuery = LeafQuery(query); + if (!leafQuery) + { + continue; + } + + opExpressionList = PartitionColumnOpExpressionList(query); + if (opExpressionList == NIL) + { + continue; + } + + newOpExpressionList = ReplaceColumnsInOpExpressionList(opExpressionList, + partitionColumn); + + if (subqueryOpExpressionList == NIL) + { + subqueryOpExpressionList = newOpExpressionList; + } + else + { + bool equalOpExpressionLists = EqualOpExpressionLists(subqueryOpExpressionList, + newOpExpressionList); + if (!equalOpExpressionLists) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot push down this subquery", + "Currently all leaf queries need to " + "have same filters on partition column", NULL); + } + } + } + + return NULL; +} + + +/* + * EqualOpExpressionLists checks if given two operator expression lists are + * equal. + */ +static bool +EqualOpExpressionLists(List *firstOpExpressionList, List *secondOpExpressionList) +{ + bool equalOpExpressionLists = false; + ListCell *firstOpExpressionCell = NULL; + uint32 equalOpExpressionCount = 0; + uint32 firstOpExpressionCount = list_length(firstOpExpressionList); + uint32 secondOpExpressionCount = list_length(secondOpExpressionList); + + if (firstOpExpressionCount != secondOpExpressionCount) + { + return false; + } + + foreach(firstOpExpressionCell, firstOpExpressionList) + { + OpExpr *firstOpExpression = (OpExpr *) lfirst(firstOpExpressionCell); + ListCell *secondOpExpressionCell = NULL; + + foreach(secondOpExpressionCell, secondOpExpressionList) + { + OpExpr *secondOpExpression = (OpExpr *) lfirst(secondOpExpressionCell); + bool equalExpressions = equal(firstOpExpression, secondOpExpression); + + if (equalExpressions) + { + equalOpExpressionCount++; + continue; + } + } + } + + if (equalOpExpressionCount == firstOpExpressionCount) + { + equalOpExpressionLists = true; + } + + return equalOpExpressionLists; +} + + +/* + * DeferErrorIfCannotPushdownSubquery recursively checks if we can push down the given + * subquery to worker nodes. If we cannot push down the subquery, this function + * returns a deferred error. + * + * We can push down a subquery if it follows rules below. We support nested queries + * as long as they follow the same rules, and we recurse to validate each subquery + * for this given query. + * a. If there is an aggregate, it must be grouped on partition column. + * b. If there is a join, it must be between two regular tables or two subqueries. + * We don't support join between a regular table and a subquery. And columns on + * the join condition must be partition columns. + * c. If there is a distinct clause, it must be on the partition column. + * + * This function is very similar to ErrorIfQueryNotSupported() in logical + * planner, but we don't reuse it, because differently for subqueries we support + * a subset of distinct, union and left joins. + * + * Note that this list of checks is not exhaustive, there can be some cases + * which we let subquery to run but returned results would be wrong. Such as if + * a subquery has a group by on another subquery which includes order by with + * limit, we let this query to run, but results could be wrong depending on the + * features of underlying tables. + */ +static DeferredErrorMessage * +DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit) +{ + bool preconditionsSatisfied = true; + char *errorDetail = NULL; + List *subqueryEntryList = NIL; + ListCell *rangeTableEntryCell = NULL; + DeferredErrorMessage *deferredError = NULL; + + deferredError = DeferErrorIfUnsupportedTableCombination(subqueryTree); + if (deferredError) + { + return deferredError; + } + + if (subqueryTree->hasSubLinks) + { + preconditionsSatisfied = false; + errorDetail = "Subqueries other than from-clause subqueries are unsupported"; + } + + if (subqueryTree->rtable == NIL) + { + preconditionsSatisfied = false; + errorDetail = "Subqueries without relations are unsupported"; + } + + if (subqueryTree->hasWindowFuncs) + { + preconditionsSatisfied = false; + errorDetail = "Window functions are currently unsupported"; + } + + if (subqueryTree->limitOffset) + { + preconditionsSatisfied = false; + errorDetail = "Offset clause is currently unsupported"; + } + + if (subqueryTree->limitCount && !outerQueryHasLimit) + { + preconditionsSatisfied = false; + errorDetail = "Limit in subquery without limit in the outer query is unsupported"; + } + + if (subqueryTree->setOperations) + { + deferredError = DeferErrorIfUnsupportedUnionQuery(subqueryTree, + outerQueryHasLimit); + if (deferredError) + { + return deferredError; + } + } + + if (subqueryTree->hasRecursive) + { + preconditionsSatisfied = false; + errorDetail = "Recursive queries are currently unsupported"; + } + + if (subqueryTree->cteList) + { + preconditionsSatisfied = false; + errorDetail = "Common Table Expressions are currently unsupported"; + } + + if (subqueryTree->hasForUpdate) + { + preconditionsSatisfied = false; + errorDetail = "For Update/Share commands are currently unsupported"; + } + + /* group clause list must include partition column */ + if (subqueryTree->groupClause) + { + List *groupClauseList = subqueryTree->groupClause; + List *targetEntryList = subqueryTree->targetList; + List *groupTargetEntryList = GroupTargetEntryList(groupClauseList, + targetEntryList); + bool groupOnPartitionColumn = TargetListOnPartitionColumn(subqueryTree, + groupTargetEntryList); + if (!groupOnPartitionColumn) + { + preconditionsSatisfied = false; + errorDetail = "Group by list without partition column is currently " + "unsupported"; + } + } + + /* we don't support aggregates without group by */ + if (subqueryTree->hasAggs && (subqueryTree->groupClause == NULL)) + { + preconditionsSatisfied = false; + errorDetail = "Aggregates without group by are currently unsupported"; + } + + /* having clause without group by on partition column is not supported */ + if (subqueryTree->havingQual && (subqueryTree->groupClause == NULL)) + { + preconditionsSatisfied = false; + errorDetail = "Having qual without group by on partition column is " + "currently unsupported"; + } + + /* distinct clause list must include partition column */ + if (subqueryTree->distinctClause) + { + List *distinctClauseList = subqueryTree->distinctClause; + List *targetEntryList = subqueryTree->targetList; + List *distinctTargetEntryList = GroupTargetEntryList(distinctClauseList, + targetEntryList); + bool distinctOnPartitionColumn = + TargetListOnPartitionColumn(subqueryTree, distinctTargetEntryList); + if (!distinctOnPartitionColumn) + { + preconditionsSatisfied = false; + errorDetail = "Distinct on columns without partition column is " + "currently unsupported"; + } + } + + /* finally check and return deferred if not satisfied */ + if (!preconditionsSatisfied) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot push down this subquery", + errorDetail, NULL); + } + + /* recursively do same check for subqueries of this query */ + subqueryEntryList = SubqueryEntryList(subqueryTree); + foreach(rangeTableEntryCell, subqueryEntryList) + { + RangeTblEntry *rangeTableEntry = + (RangeTblEntry *) lfirst(rangeTableEntryCell); + + Query *innerSubquery = rangeTableEntry->subquery; + deferredError = DeferErrorIfCannotPushdownSubquery(innerSubquery, + outerQueryHasLimit); + if (deferredError) + { + return deferredError; + } + } + + return NULL; +} + + +/* + * DeferErrorIfUnsupportedUnionQuery is a helper function for ErrorIfCannotPushdownSubquery(). + * It basically iterates over the subqueries that reside under the given set operations. + * + * The function also errors out for set operations INTERSECT and EXCEPT. + */ +static DeferredErrorMessage * +DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree, + bool outerQueryHasLimit) +{ + List *rangeTableIndexList = NIL; + ListCell *rangeTableIndexCell = NULL; + List *setOperationStatementList = NIL; + ListCell *setOperationStatmentCell = NULL; + List *rangeTableList = subqueryTree->rtable; + + ExtractSetOperationStatmentWalker((Node *) subqueryTree->setOperations, + &setOperationStatementList); + foreach(setOperationStatmentCell, setOperationStatementList) + { + SetOperationStmt *setOperation = + (SetOperationStmt *) lfirst(setOperationStatmentCell); + + if (setOperation->op != SETOP_UNION) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot push down this subquery", + "Intersect and Except are currently unsupported", NULL); + } + } + + ExtractRangeTableIndexWalker((Node *) subqueryTree->setOperations, + &rangeTableIndexList); + foreach(rangeTableIndexCell, rangeTableIndexList) + { + int rangeTableIndex = lfirst_int(rangeTableIndexCell); + RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableIndex, rangeTableList); + DeferredErrorMessage *deferredError = NULL; + + Assert(rangeTableEntry->rtekind == RTE_SUBQUERY); + + deferredError = DeferErrorIfCannotPushdownSubquery(rangeTableEntry->subquery, + outerQueryHasLimit); + if (deferredError) + { + return deferredError; + } + } + + return NULL; +} + + +/* + * ExtractSetOperationStatementWalker walks over a set operations statment, + * and finds all set operations in the tree. + */ +static bool +ExtractSetOperationStatmentWalker(Node *node, List **setOperationList) +{ + bool walkerResult = false; + if (node == NULL) + { + return false; + } + + if (IsA(node, SetOperationStmt)) + { + SetOperationStmt *setOperation = (SetOperationStmt *) node; + + (*setOperationList) = lappend(*setOperationList, setOperation); + } + + walkerResult = expression_tree_walker(node, ExtractSetOperationStatmentWalker, + setOperationList); + + return walkerResult; +} + + +/* + * DeferErrorIfUnsupportedTableCombination checks if the given query tree contains any + * unsupported range table combinations. For this, the function walks over all + * range tables in the join tree, and checks if they correspond to simple relations + * or subqueries. It also checks if there is a join between a regular table and + * a subquery and if join is on more than two range table entries. If any error is found, + * a deferred error is returned. Else, NULL is returned. + */ +static DeferredErrorMessage * +DeferErrorIfUnsupportedTableCombination(Query *queryTree) +{ + List *rangeTableList = queryTree->rtable; + List *joinTreeTableIndexList = NIL; + ListCell *joinTreeTableIndexCell = NULL; + bool unsupporteTableCombination = false; + char *errorDetail = NULL; + uint32 relationRangeTableCount = 0; + uint32 subqueryRangeTableCount = 0; + + /* + * Extract all range table indexes from the join tree. Note that sub-queries + * that get pulled up by PostgreSQL don't appear in this join tree. + */ + ExtractRangeTableIndexWalker((Node *) queryTree->jointree, &joinTreeTableIndexList); + foreach(joinTreeTableIndexCell, joinTreeTableIndexList) + { + /* + * Join tree's range table index starts from 1 in the query tree. But, + * list indexes start from 0. + */ + int joinTreeTableIndex = lfirst_int(joinTreeTableIndexCell); + int rangeTableListIndex = joinTreeTableIndex - 1; + + RangeTblEntry *rangeTableEntry = + (RangeTblEntry *) list_nth(rangeTableList, rangeTableListIndex); + + /* + * Check if the range table in the join tree is a simple relation or a + * subquery. + */ + if (rangeTableEntry->rtekind == RTE_RELATION) + { + relationRangeTableCount++; + } + else if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { + subqueryRangeTableCount++; + } + else + { + unsupporteTableCombination = true; + errorDetail = "Table expressions other than simple relations and " + "subqueries are currently unsupported"; + break; + } + } + + /* finally check and error out if not satisfied */ + if (unsupporteTableCombination) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot push down this subquery", + errorDetail, NULL); + } + + return NULL; +} + + +/* + * TargetListOnPartitionColumn checks if at least one target list entry is on + * partition column. + */ +static bool +TargetListOnPartitionColumn(Query *query, List *targetEntryList) +{ + bool targetListOnPartitionColumn = false; + List *compositeFieldList = NIL; + + ListCell *targetEntryCell = NULL; + foreach(targetEntryCell, targetEntryList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + Expr *targetExpression = targetEntry->expr; + + bool isPartitionColumn = IsPartitionColumn(targetExpression, query); + if (isPartitionColumn) + { + FieldSelect *compositeField = CompositeFieldRecursive(targetExpression, + query); + if (compositeField) + { + compositeFieldList = lappend(compositeFieldList, compositeField); + } + else + { + targetListOnPartitionColumn = true; + break; + } + } + } + + /* check composite fields */ + if (!targetListOnPartitionColumn) + { + bool fullCompositeFieldList = FullCompositeFieldList(compositeFieldList); + if (fullCompositeFieldList) + { + targetListOnPartitionColumn = true; + } + } + + return targetListOnPartitionColumn; +} + + +/* + * FullCompositeFieldList gets a composite field list, and checks if all fields + * of composite type are used in the list. + */ +static bool +FullCompositeFieldList(List *compositeFieldList) +{ + bool fullCompositeFieldList = true; + bool *compositeFieldArray = NULL; + uint32 compositeFieldCount = 0; + uint32 fieldIndex = 0; + + ListCell *fieldSelectCell = NULL; + foreach(fieldSelectCell, compositeFieldList) + { + FieldSelect *fieldSelect = (FieldSelect *) lfirst(fieldSelectCell); + uint32 compositeFieldIndex = 0; + + Expr *fieldExpression = fieldSelect->arg; + if (!IsA(fieldExpression, Var)) + { + continue; + } + + if (compositeFieldArray == NULL) + { + uint32 index = 0; + Var *compositeColumn = (Var *) fieldExpression; + Oid compositeTypeId = compositeColumn->vartype; + Oid compositeRelationId = get_typ_typrelid(compositeTypeId); + + /* get composite type attribute count */ + Relation relation = relation_open(compositeRelationId, AccessShareLock); + compositeFieldCount = relation->rd_att->natts; + compositeFieldArray = palloc0(compositeFieldCount * sizeof(bool)); + relation_close(relation, AccessShareLock); + + for (index = 0; index < compositeFieldCount; index++) + { + compositeFieldArray[index] = false; + } + } + + compositeFieldIndex = fieldSelect->fieldnum - 1; + compositeFieldArray[compositeFieldIndex] = true; + } + + for (fieldIndex = 0; fieldIndex < compositeFieldCount; fieldIndex++) + { + if (!compositeFieldArray[fieldIndex]) + { + fullCompositeFieldList = false; + } + } + + if (compositeFieldCount == 0) + { + fullCompositeFieldList = false; + } + + return fullCompositeFieldList; +} + + +/* + * CompositeFieldRecursive recursively finds composite field in the query tree + * referred by given expression. If expression does not refer to a composite + * field, then it returns NULL. + * + * If expression is a field select we directly return composite field. If it is + * a column is referenced from a subquery, then we recursively check that subquery + * until we reach the source of that column, and find composite field. If this + * column is referenced from join range table entry, then we resolve which join + * column it refers and recursively use this column with the same query. + */ +static FieldSelect * +CompositeFieldRecursive(Expr *expression, Query *query) +{ + FieldSelect *compositeField = NULL; + List *rangetableList = query->rtable; + Index rangeTableEntryIndex = 0; + RangeTblEntry *rangeTableEntry = NULL; + Var *candidateColumn = NULL; + + if (IsA(expression, FieldSelect)) + { + compositeField = (FieldSelect *) expression; + return compositeField; + } + + if (IsA(expression, Var)) + { + candidateColumn = (Var *) expression; + } + else + { + return NULL; + } + + rangeTableEntryIndex = candidateColumn->varno - 1; + rangeTableEntry = list_nth(rangetableList, rangeTableEntryIndex); + + if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { + Query *subquery = rangeTableEntry->subquery; + List *targetEntryList = subquery->targetList; + AttrNumber targetEntryIndex = candidateColumn->varattno - 1; + TargetEntry *subqueryTargetEntry = list_nth(targetEntryList, targetEntryIndex); + + Expr *subqueryExpression = subqueryTargetEntry->expr; + compositeField = CompositeFieldRecursive(subqueryExpression, subquery); + } + else if (rangeTableEntry->rtekind == RTE_JOIN) + { + List *joinColumnList = rangeTableEntry->joinaliasvars; + AttrNumber joinColumnIndex = candidateColumn->varattno - 1; + Expr *joinColumn = list_nth(joinColumnList, joinColumnIndex); + + compositeField = CompositeFieldRecursive(joinColumn, query); + } + + return compositeField; +} + + /* * SubqueryEntryList finds the subquery nodes in the range table entry list, and * builds a list of subquery range table entries from these subquery nodes. Range @@ -259,14 +1104,14 @@ MultiPlanTree(Query *queryTree) List *columnList = NIL; ListCell *columnCell = NULL; + /* we only support single subquery in the entry list */ + Assert(list_length(subqueryEntryList) == 1); + subqueryRangeTableEntry = (RangeTblEntry *) linitial(subqueryEntryList); subqueryTree = subqueryRangeTableEntry->subquery; - /* check if subquery satisfies preconditons */ - ErrorIfSubqueryNotSupported(subqueryTree); - - /* check if subquery has joining tables */ - ErrorIfSubqueryJoin(subqueryTree); + /* ensure if subquery satisfies preconditions */ + Assert(DeferErrorIfUnsupportedSubqueryRepartition(subqueryTree) == NULL); subqueryNode = CitusMakeNode(MultiTable); subqueryNode->relationId = SUBQUERY_RELATION_ID; @@ -602,14 +1447,19 @@ ErrorHintRequired(const char *errorHint, Query *queryTree) /* - * ErrorIfSubqueryNotSupported checks that we can perform distributed planning for - * the given subquery. + * DeferErrorIfSubqueryNotSupported checks that we can perform distributed planning for + * the given subquery. If not, a deferred error is returned. The function recursively + * does this check to all lower levels of the subquery. */ -static void -ErrorIfSubqueryNotSupported(Query *subqueryTree) +static DeferredErrorMessage * +DeferErrorIfUnsupportedSubqueryRepartition(Query *subqueryTree) { char *errorDetail = NULL; bool preconditionsSatisfied = true; + List *joinTreeTableIndexList = NIL; + int rangeTableIndex = 0; + RangeTblEntry *rangeTableEntry = NULL; + Query *innerSubquery = NULL; if (!subqueryTree->hasAggs) { @@ -641,13 +1491,35 @@ ErrorIfSubqueryNotSupported(Query *subqueryTree) errorDetail = "Subqueries with offset are not supported yet"; } - /* finally check and error out if not satisfied */ + /* finally check and return error if conditions are not satisfied */ if (!preconditionsSatisfied) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning on this query"), - errdetail("%s", errorDetail))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot perform distributed planning on this query", + errorDetail, NULL); } + + /* + * Extract all range table indexes from the join tree. Note that sub-queries + * that get pulled up by PostgreSQL don't appear in this join tree. + */ + ExtractRangeTableIndexWalker((Node *) subqueryTree->jointree, + &joinTreeTableIndexList); + Assert(list_length(joinTreeTableIndexList) == 1); + + /* continue with the inner subquery */ + rangeTableIndex = linitial_int(joinTreeTableIndexList); + rangeTableEntry = rt_fetch(rangeTableIndex, subqueryTree->rtable); + if (rangeTableEntry->rtekind == RTE_RELATION) + { + return NULL; + } + + Assert(rangeTableEntry->rtekind == RTE_SUBQUERY); + innerSubquery = rangeTableEntry->subquery; + + /* recursively continue to the inner subqueries */ + return DeferErrorIfUnsupportedSubqueryRepartition(innerSubquery); } @@ -2127,32 +2999,6 @@ SubqueryPushdownMultiPlanTree(Query *queryTree) } -/* - * ErrorIfSubqueryJoin errors out if the given query is a join query. Note that - * this function will not be required once we implement subquery joins. - */ -static void -ErrorIfSubqueryJoin(Query *queryTree) -{ - List *joinTreeTableIndexList = NIL; - uint32 joiningRangeTableCount = 0; - - /* - * Extract all range table indexes from the join tree. Note that sub-queries - * that get pulled up by PostgreSQL don't appear in this join tree. - */ - ExtractRangeTableIndexWalker((Node *) queryTree->jointree, &joinTreeTableIndexList); - joiningRangeTableCount = list_length(joinTreeTableIndexList); - - if (joiningRangeTableCount > 1) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot perform distributed planning on this query"), - errdetail("Join in subqueries is not supported yet"))); - } -} - - /* * CreateSubqueryTargetEntryList creates a target entry for each unique column * in the column list and returns the target entry list. diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index cb4625e1e..05cdcddf2 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -124,6 +124,11 @@ static Job * BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestrictionContext); static List * SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionContext); +static void ErrorIfUnsupportedShardDistribution(Query *query); +static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); +static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, + ShardInterval *firstInterval, + ShardInterval *secondInterval); static Task * SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, RelationRestrictionContext *restrictionContext, uint32 taskId); @@ -2031,6 +2036,9 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte RelationRestrictionContext *relationRestrictionContext = plannerRestrictionContext->relationRestrictionContext; + /* error if shards are not co-partitioned */ + ErrorIfUnsupportedShardDistribution(subquery); + /* get list of all range tables in subquery tree */ ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList); @@ -2082,6 +2090,171 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte } +/* + * ErrorIfUnsupportedShardDistribution gets list of relations in the given query + * and checks if two conditions below hold for them, otherwise it errors out. + * a. Every relation is distributed by range or hash. This means shards are + * disjoint based on the partition column. + * b. All relations have 1-to-1 shard partitioning between them. This means + * shard count for every relation is same and for every shard in a relation + * there is exactly one shard in other relations with same min/max values. + */ +static void +ErrorIfUnsupportedShardDistribution(Query *query) +{ + Oid firstTableRelationId = InvalidOid; + List *relationIdList = RelationIdList(query); + ListCell *relationIdCell = NULL; + uint32 relationIndex = 0; + uint32 rangeDistributedRelationCount = 0; + uint32 hashDistributedRelationCount = 0; + + foreach(relationIdCell, relationIdList) + { + Oid relationId = lfirst_oid(relationIdCell); + char partitionMethod = PartitionMethod(relationId); + if (partitionMethod == DISTRIBUTE_BY_RANGE) + { + rangeDistributedRelationCount++; + } + else if (partitionMethod == DISTRIBUTE_BY_HASH) + { + hashDistributedRelationCount++; + } + else + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot push down this subquery"), + errdetail("Currently range and hash partitioned " + "relations are supported"))); + } + } + + if ((rangeDistributedRelationCount > 0) && (hashDistributedRelationCount > 0)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot push down this subquery"), + errdetail("A query including both range and hash " + "partitioned relations are unsupported"))); + } + + foreach(relationIdCell, relationIdList) + { + Oid relationId = lfirst_oid(relationIdCell); + bool coPartitionedTables = false; + Oid currentRelationId = relationId; + + /* get shard list of first relation and continue for the next relation */ + if (relationIndex == 0) + { + firstTableRelationId = relationId; + relationIndex++; + + continue; + } + + /* check if this table has 1-1 shard partitioning with first table */ + coPartitionedTables = CoPartitionedTables(firstTableRelationId, + currentRelationId); + if (!coPartitionedTables) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot push down this subquery"), + errdetail("Shards of relations in subquery need to " + "have 1-to-1 shard partitioning"))); + } + } +} + + +/* + * CoPartitionedTables checks if given two distributed tables have 1-to-1 shard + * partitioning. It uses shard interval array that are sorted on interval minimum + * values. Then it compares every shard interval in order and if any pair of + * shard intervals are not equal it returns false. + */ +static bool +CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) +{ + bool coPartitionedTables = true; + uint32 intervalIndex = 0; + DistTableCacheEntry *firstTableCache = DistributedTableCacheEntry(firstRelationId); + DistTableCacheEntry *secondTableCache = DistributedTableCacheEntry(secondRelationId); + ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray; + ShardInterval **sortedSecondIntervalArray = + secondTableCache->sortedShardIntervalArray; + uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength; + uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength; + FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction; + + if (firstListShardCount != secondListShardCount) + { + return false; + } + + /* if there are not any shards just return true */ + if (firstListShardCount == 0) + { + return true; + } + + Assert(comparisonFunction != NULL); + + for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++) + { + ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex]; + ShardInterval *secondInterval = sortedSecondIntervalArray[intervalIndex]; + + bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction, + firstInterval, + secondInterval); + if (!shardIntervalsEqual) + { + coPartitionedTables = false; + break; + } + } + + return coPartitionedTables; +} + + +/* + * ShardIntervalsEqual checks if given shard intervals have equal min/max values. + */ +static bool +ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval, + ShardInterval *secondInterval) +{ + bool shardIntervalsEqual = false; + Datum firstMin = 0; + Datum firstMax = 0; + Datum secondMin = 0; + Datum secondMax = 0; + + firstMin = firstInterval->minValue; + firstMax = firstInterval->maxValue; + secondMin = secondInterval->minValue; + secondMax = secondInterval->maxValue; + + if (firstInterval->minValueExists && firstInterval->maxValueExists && + secondInterval->minValueExists && secondInterval->maxValueExists) + { + Datum minDatum = CompareCall2(comparisonFunction, firstMin, secondMin); + Datum maxDatum = CompareCall2(comparisonFunction, firstMax, secondMax); + int firstComparison = DatumGetInt32(minDatum); + int secondComparison = DatumGetInt32(maxDatum); + + if (firstComparison == 0 && secondComparison == 0) + { + shardIntervalsEqual = true; + } + } + + return shardIntervalsEqual; +} + + /* * SubqueryTaskCreate creates a sql task by replacing the target * shardInterval's boundary value.. Then performs the normal diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 49a81ee3b..3cebc1e53 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -310,10 +310,9 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query */ if ((!distributedPlan || distributedPlan->planningError) && !hasUnresolvedParams) { - /* Create and optimize logical plan */ - MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query); - MultiLogicalPlanOptimize(logicalPlan, plannerRestrictionContext, - originalQuery); + MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query, + plannerRestrictionContext); + MultiLogicalPlanOptimize(logicalPlan); /* * This check is here to make it likely that all node types used in diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 3af5056a2..5fb340e17 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -1366,3 +1366,29 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) return InvalidAttrNumber; } + + +/* + * RelationIdList returns list of unique relation ids in query tree. + */ +List * +RelationIdList(Query *query) +{ + List *rangeTableList = NIL; + List *tableEntryList = NIL; + List *relationIdList = NIL; + ListCell *tableEntryCell = NULL; + + ExtractRangeTableRelationWalker((Node *) query, &rangeTableList); + tableEntryList = TableEntryList(rangeTableList); + + foreach(tableEntryCell, tableEntryList) + { + TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell); + Oid relationId = tableEntry->relationId; + + relationIdList = list_append_unique_oid(relationIdList, relationId); + } + + return relationIdList; +} diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index d10a9d1aa..9c3c137fe 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -107,9 +107,7 @@ extern double CountDistinctErrorRate; /* Function declaration for optimizing logical plans */ -extern void MultiLogicalPlanOptimize(MultiTreeRoot *multiTree, - PlannerRestrictionContext *plannerRestrictionContext, - Query *originalQuery); +extern void MultiLogicalPlanOptimize(MultiTreeRoot *multiTree); /* Function declaration for getting partition method for the given relation */ extern char PartitionMethod(Oid relationId); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 5b11a8ff3..52c676b4b 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -16,6 +16,7 @@ #include "distributed/citus_nodes.h" #include "distributed/multi_join_order.h" +#include "distributed/relation_restriction_equivalence.h" #include "nodes/nodes.h" #include "nodes/primnodes.h" #include "nodes/parsenodes.h" @@ -180,7 +181,9 @@ extern bool SubqueryPushdown; /* Function declarations for building logical plans */ -extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree); +extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree, + PlannerRestrictionContext * + plannerRestrictionContext); extern bool NeedsDistributedPlanning(Query *queryTree); extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode); diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index f06a707c5..f0fcd33e9 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -19,6 +19,7 @@ extern bool ContainsUnionSubquery(Query *queryTree); extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext * plannerRestrictionContext); extern bool SafeToPushdownUnionSubquery(RelationRestrictionContext *restrictionContext); +extern List * RelationIdList(Query *query); #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/expected/multi_complex_expressions.out b/src/test/regress/expected/multi_complex_expressions.out index 3723cff0a..0914adb6f 100644 --- a/src/test/regress/expected/multi_complex_expressions.out +++ b/src/test/regress/expected/multi_complex_expressions.out @@ -348,16 +348,14 @@ ERROR: cannot perform local joins that involve expressions DETAIL: local joins can be performed between columns only -- Check that we can issue limit/offset queries -- OFFSET in subqueries are not supported --- Error in the planner when subquery pushdown is off +-- Error in the planner when single repartition subquery +SELECT * FROM (SELECT o_custkey FROM orders GROUP BY o_custkey ORDER BY o_custkey OFFSET 20) sq; +ERROR: cannot perform distributed planning on this query +DETAIL: Subqueries with offset are not supported yet +-- Error in the optimizer when subquery pushdown is on SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; ERROR: cannot perform distributed planning on this query DETAIL: Subqueries with offset are not supported yet -SET citus.subquery_pushdown TO true; --- Error in the optimizer when subquery pushdown is on -SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; -ERROR: cannot push down this subquery -DETAIL: Offset clause is currently unsupported -SET citus.subquery_pushdown TO false; -- Simple LIMIT/OFFSET with ORDER BY SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20; o_orderkey diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index 74b3c0b24..37f3a8718 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -802,8 +802,8 @@ SELECT * FROM ( (SELECT * FROM articles_hash_mx WHERE author_id = 1) UNION (SELECT * FROM articles_hash_mx WHERE author_id = 2)) uu; -ERROR: cannot perform distributed planning on this query -DETAIL: Subqueries without group by clause are not supported yet +ERROR: cannot push down this subquery +DETAIL: Currently all leaf queries need to have same filters on partition column -- error out for queries with repartition jobs SELECT * FROM articles_hash_mx a, articles_hash_mx b diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index d12ca62fd..c2996c6cb 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -916,8 +916,8 @@ SELECT * FROM ( (SELECT * FROM articles_hash WHERE author_id = 1) UNION (SELECT * FROM articles_hash WHERE author_id = 2)) uu; -ERROR: cannot perform distributed planning on this query -DETAIL: Subqueries without group by clause are not supported yet +ERROR: cannot push down this subquery +DETAIL: Currently all leaf queries need to have same filters on partition column -- error out for queries with repartition jobs SELECT * FROM articles_hash a, articles_hash b diff --git a/src/test/regress/expected/multi_subquery_behavioral_analytics.out b/src/test/regress/expected/multi_subquery_behavioral_analytics.out index fa49997d5..5d2a93480 100644 --- a/src/test/regress/expected/multi_subquery_behavioral_analytics.out +++ b/src/test/regress/expected/multi_subquery_behavioral_analytics.out @@ -6,7 +6,6 @@ -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1430000; -- ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1430000; -SET citus.subquery_pushdown TO TRUE; SET citus.enable_router_execution TO FALSE; ------------------------------------ -- Vanilla funnel query @@ -1673,5 +1672,4 @@ WHERE b.user_id IS NULL GROUP BY a.user_id; ERROR: cannot push down this subquery DETAIL: Table expressions other than simple relations and subqueries are currently unsupported -SET citus.subquery_pushdown TO FALSE; SET citus.enable_router_execution TO TRUE; diff --git a/src/test/regress/expected/multi_subquery_complex_queries.out b/src/test/regress/expected/multi_subquery_complex_queries.out index 294327710..636a0f553 100644 --- a/src/test/regress/expected/multi_subquery_complex_queries.out +++ b/src/test/regress/expected/multi_subquery_complex_queries.out @@ -7,7 +7,6 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000; -SET citus.subquery_pushdown TO TRUE; SET citus.enable_router_execution TO FALSE; -- -- UNIONs and JOINs mixed @@ -2025,8 +2024,8 @@ FROM ( ) user_id ORDER BY 1, 2 limit 10; -ERROR: cannot push down this subquery -DETAIL: Window functions are currently unsupported +ERROR: cannot perform distributed planning on this query +DETAIL: Subqueries without group by clause are not supported yet -- not supported due to non relation rte SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType FROM @@ -2113,5 +2112,4 @@ GROUP BY types ORDER BY types; ERROR: cannot push down this subquery DETAIL: Subqueries without relations are unsupported -SET citus.subquery_pushdown TO FALSE; SET citus.enable_router_execution TO TRUE; diff --git a/src/test/regress/expected/multi_subquery_union.out b/src/test/regress/expected/multi_subquery_union.out index 3b2be3465..f6ea5e9ac 100644 --- a/src/test/regress/expected/multi_subquery_union.out +++ b/src/test/regress/expected/multi_subquery_union.out @@ -4,7 +4,6 @@ -- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; -SET citus.subquery_pushdown TO true; SET citus.enable_router_execution TO false; -- a very simple union query SELECT user_id, counter @@ -845,5 +844,4 @@ GROUP BY types ORDER BY types; ERROR: cannot push down this subquery DETAIL: Subqueries without relations are unsupported -SET citus.subquery_pushdown TO false; SET citus.enable_router_execution TO true; diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out index f8444c562..159ca78f2 100644 --- a/src/test/regress/expected/multi_view.out +++ b/src/test/regress/expected/multi_view.out @@ -234,16 +234,33 @@ SET citus.task_executor_type to DEFAULT; -- create a view with aggregate CREATE VIEW lineitems_by_shipping_method AS SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1; --- following will fail due to non-flattening of subquery due to GROUP BY +-- following will fail due to non GROUP BY of partition key SELECT * FROM lineitems_by_shipping_method; ERROR: Unrecognized range table id 1 -- create a view with group by on partition column CREATE VIEW lineitems_by_orderkey AS - SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1; --- this will also fail due to same reason -SELECT * FROM lineitems_by_orderkey; -ERROR: Unrecognized range table id 1 --- however it would work if it is made router plannable + SELECT + l_orderkey, count(*) + FROM + lineitem_hash_part + GROUP BY 1; +-- this should work since we're able to push down this query +SELECT * FROM lineitems_by_orderkey ORDER BY 2 DESC, 1 ASC LIMIT 10; + l_orderkey | count +------------+------- + 7 | 7 + 68 | 7 + 129 | 7 + 164 | 7 + 194 | 7 + 225 | 7 + 226 | 7 + 322 | 7 + 326 | 7 + 354 | 7 +(10 rows) + +-- it would also work since it is made router plannable SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100; l_orderkey | count ------------+------- diff --git a/src/test/regress/input/multi_complex_count_distinct.source b/src/test/regress/input/multi_complex_count_distinct.source index ffd4438a1..1ca2cc617 100644 --- a/src/test/regress/input/multi_complex_count_distinct.source +++ b/src/test/regress/input/multi_complex_count_distinct.source @@ -130,9 +130,9 @@ SELECT * SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT 1) + l_linenumber, count(DISTINCT 1) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; @@ -140,9 +140,9 @@ SELECT * SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT (random() * 5)::int) + l_linenumber, count(DISTINCT (random() * 5)::int) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; @@ -277,18 +277,18 @@ SELECT * SELECT * FROM ( SELECT - l_orderkey, sum(DISTINCT l_partkey) + l_linenumber, sum(DISTINCT l_partkey) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; SELECT * FROM ( SELECT - l_orderkey, avg(DISTINCT l_partkey) + l_linenumber, avg(DISTINCT l_partkey) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; @@ -297,18 +297,18 @@ SELECT * SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT lineitem_hash) + l_linenumber, count(DISTINCT lineitem_hash) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT lineitem_hash.*) + l_linenumber, count(DISTINCT lineitem_hash.*) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; diff --git a/src/test/regress/input/multi_subquery.source b/src/test/regress/input/multi_subquery.source index 9d84e6a74..94fb2c806 100644 --- a/src/test/regress/input/multi_subquery.source +++ b/src/test/regress/input/multi_subquery.source @@ -46,7 +46,7 @@ SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range') SET citus.enable_router_execution TO 'false'; --- Check that we don't allow subquery pushdown in default settings. +-- Check that we allow subquery pushdown in default settings. SELECT avg(unit_price) @@ -62,8 +62,6 @@ FROM GROUP BY l_orderkey) AS unit_prices; -SET citus.subquery_pushdown to TRUE; - -- Check that we don't crash if there are not any shards. SELECT @@ -130,7 +128,8 @@ FROM UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003; --- If group by is not on partition column then we error out. +-- If group by is not on partition column then we error out from single table +-- repartition code path SELECT avg(order_count) @@ -227,6 +226,8 @@ FROM ( count(*) a FROM lineitem_subquery + GROUP BY + l_orderkey ) z; -- Check supported subquery types. @@ -337,7 +338,6 @@ CREATE TABLE subquery_pruning_varchar_test_table SELECT master_create_distributed_table('subquery_pruning_varchar_test_table', 'a', 'hash'); SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1); -SET citus.subquery_pushdown TO TRUE; SET client_min_messages TO DEBUG2; SELECT * FROM @@ -925,8 +925,6 @@ WHERE shardid = :new_shard_id; \. -SET citus.subquery_pushdown TO TRUE; - -- Simple join subquery pushdown SELECT avg(array_length(events, 1)) AS event_average diff --git a/src/test/regress/output/multi_complex_count_distinct.source b/src/test/regress/output/multi_complex_count_distinct.source index 707438761..59b4045ad 100644 --- a/src/test/regress/output/multi_complex_count_distinct.source +++ b/src/test/regress/output/multi_complex_count_distinct.source @@ -241,9 +241,9 @@ SELECT * SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT 1) + l_linenumber, count(DISTINCT 1) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; ERROR: cannot compute aggregate (distinct) @@ -253,9 +253,9 @@ HINT: You can load the hll extension from contrib packages and enable distinct SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT (random() * 5)::int) + l_linenumber, count(DISTINCT (random() * 5)::int) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; ERROR: cannot compute aggregate (distinct) @@ -464,9 +464,9 @@ SELECT * SELECT * FROM ( SELECT - l_orderkey, sum(DISTINCT l_partkey) + l_linenumber, sum(DISTINCT l_partkey) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; ERROR: cannot compute aggregate (distinct) @@ -474,9 +474,9 @@ DETAIL: Only count(distinct) aggregate is supported in subqueries SELECT * FROM ( SELECT - l_orderkey, avg(DISTINCT l_partkey) + l_linenumber, avg(DISTINCT l_partkey) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; ERROR: cannot compute aggregate (distinct) @@ -486,9 +486,9 @@ DETAIL: Only count(distinct) aggregate is supported in subqueries SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT lineitem_hash) + l_linenumber, count(DISTINCT lineitem_hash) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; ERROR: cannot compute count (distinct) @@ -496,9 +496,9 @@ DETAIL: Non-column references are not supported yet SELECT * FROM ( SELECT - l_orderkey, count(DISTINCT lineitem_hash.*) + l_linenumber, count(DISTINCT lineitem_hash.*) FROM lineitem_hash - GROUP BY l_orderkey) sub + GROUP BY l_linenumber) sub ORDER BY 2 DESC, 1 DESC LIMIT 10; ERROR: cannot compute count (distinct) diff --git a/src/test/regress/output/multi_subquery.source b/src/test/regress/output/multi_subquery.source index e2bff0987..7b8c2a56b 100644 --- a/src/test/regress/output/multi_subquery.source +++ b/src/test/regress/output/multi_subquery.source @@ -53,7 +53,7 @@ SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range') (1 row) SET citus.enable_router_execution TO 'false'; --- Check that we don't allow subquery pushdown in default settings. +-- Check that we allow subquery pushdown in default settings. SELECT avg(unit_price) FROM @@ -67,9 +67,11 @@ FROM l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; -ERROR: cannot perform distributed planning on this query -DETAIL: Join in subqueries is not supported yet -SET citus.subquery_pushdown to TRUE; + avg +----- + +(1 row) + -- Check that we don't crash if there are not any shards. SELECT avg(unit_price) @@ -129,7 +131,8 @@ ERROR: cannot push down this subquery DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning -- Update metadata in order to make all shards equal. UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003; --- If group by is not on partition column then we error out. +-- If group by is not on partition column then we error out from single table +-- repartition code path SELECT avg(order_count) FROM @@ -140,8 +143,8 @@ FROM lineitem_subquery GROUP BY l_suppkey) AS order_counts; -ERROR: cannot push down this subquery -DETAIL: Group by list without partition column is currently unsupported +ERROR: cannot use real time executor with repartition jobs +HINT: Set citus.task_executor_type to "task-tracker". -- Check that we error out if join is not on partition columns. SELECT avg(unit_price) @@ -223,6 +226,8 @@ FROM ( count(*) a FROM lineitem_subquery + GROUP BY + l_orderkey ) z; ERROR: cannot push down this subquery DETAIL: distinct in the outermost query is unsupported @@ -357,7 +362,6 @@ SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1); (1 row) -SET citus.subquery_pushdown TO TRUE; SET client_min_messages TO DEBUG2; SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) @@ -884,7 +888,6 @@ SELECT master_create_empty_shard('users') AS new_shard_id UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' WHERE shardid = :new_shard_id; \COPY users FROM STDIN WITH CSV -SET citus.subquery_pushdown TO TRUE; -- Simple join subquery pushdown SELECT avg(array_length(events, 1)) AS event_average diff --git a/src/test/regress/output/multi_subquery_0.source b/src/test/regress/output/multi_subquery_0.source index 0f719f196..4c27449f6 100644 --- a/src/test/regress/output/multi_subquery_0.source +++ b/src/test/regress/output/multi_subquery_0.source @@ -53,7 +53,7 @@ SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range') (1 row) SET citus.enable_router_execution TO 'false'; --- Check that we don't allow subquery pushdown in default settings. +-- Check that we allow subquery pushdown in default settings. SELECT avg(unit_price) FROM @@ -67,9 +67,11 @@ FROM l_orderkey = o_orderkey GROUP BY l_orderkey) AS unit_prices; -ERROR: cannot perform distributed planning on this query -DETAIL: Join in subqueries is not supported yet -SET citus.subquery_pushdown to TRUE; + avg +----- + +(1 row) + -- Check that we don't crash if there are not any shards. SELECT avg(unit_price) @@ -129,7 +131,8 @@ ERROR: cannot push down this subquery DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning -- Update metadata in order to make all shards equal. UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003; --- If group by is not on partition column then we error out. +-- If group by is not on partition column then we error out from single table +-- repartition code path SELECT avg(order_count) FROM @@ -140,8 +143,8 @@ FROM lineitem_subquery GROUP BY l_suppkey) AS order_counts; -ERROR: cannot push down this subquery -DETAIL: Group by list without partition column is currently unsupported +ERROR: cannot use real time executor with repartition jobs +HINT: Set citus.task_executor_type to "task-tracker". -- Check that we error out if join is not on partition columns. SELECT avg(unit_price) @@ -223,6 +226,8 @@ FROM ( count(*) a FROM lineitem_subquery + GROUP BY + l_orderkey ) z; ERROR: cannot push down this subquery DETAIL: distinct in the outermost query is unsupported @@ -357,7 +362,6 @@ SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1); (1 row) -SET citus.subquery_pushdown TO TRUE; SET client_min_messages TO DEBUG2; SELECT * FROM (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) @@ -884,7 +888,6 @@ SELECT master_create_empty_shard('users') AS new_shard_id UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' WHERE shardid = :new_shard_id; \COPY users FROM STDIN WITH CSV -SET citus.subquery_pushdown TO TRUE; -- Simple join subquery pushdown SELECT avg(array_length(events, 1)) AS event_average diff --git a/src/test/regress/sql/multi_complex_expressions.sql b/src/test/regress/sql/multi_complex_expressions.sql index 236e8f4c6..4891b2103 100644 --- a/src/test/regress/sql/multi_complex_expressions.sql +++ b/src/test/regress/sql/multi_complex_expressions.sql @@ -161,13 +161,11 @@ SELECT count(*) FROM lineitem, orders WHERE l_orderkey + 1 = o_orderkey; -- Check that we can issue limit/offset queries -- OFFSET in subqueries are not supported --- Error in the planner when subquery pushdown is off -SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; -SET citus.subquery_pushdown TO true; +-- Error in the planner when single repartition subquery +SELECT * FROM (SELECT o_custkey FROM orders GROUP BY o_custkey ORDER BY o_custkey OFFSET 20) sq; -- Error in the optimizer when subquery pushdown is on SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; -SET citus.subquery_pushdown TO false; -- Simple LIMIT/OFFSET with ORDER BY SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20; diff --git a/src/test/regress/sql/multi_subquery_behavioral_analytics.sql b/src/test/regress/sql/multi_subquery_behavioral_analytics.sql index ee86bf1f1..14fe41709 100644 --- a/src/test/regress/sql/multi_subquery_behavioral_analytics.sql +++ b/src/test/regress/sql/multi_subquery_behavioral_analytics.sql @@ -8,7 +8,6 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1430000; -- ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1430000; -SET citus.subquery_pushdown TO TRUE; SET citus.enable_router_execution TO FALSE; ------------------------------------ @@ -1346,5 +1345,4 @@ FROM (SELECT WHERE b.user_id IS NULL GROUP BY a.user_id; -SET citus.subquery_pushdown TO FALSE; SET citus.enable_router_execution TO TRUE; diff --git a/src/test/regress/sql/multi_subquery_complex_queries.sql b/src/test/regress/sql/multi_subquery_complex_queries.sql index 018758b8e..412c73793 100644 --- a/src/test/regress/sql/multi_subquery_complex_queries.sql +++ b/src/test/regress/sql/multi_subquery_complex_queries.sql @@ -8,7 +8,6 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000; -SET citus.subquery_pushdown TO TRUE; SET citus.enable_router_execution TO FALSE; -- @@ -1870,5 +1869,4 @@ INNER JOIN GROUP BY types ORDER BY types; -SET citus.subquery_pushdown TO FALSE; SET citus.enable_router_execution TO TRUE; diff --git a/src/test/regress/sql/multi_subquery_union.sql b/src/test/regress/sql/multi_subquery_union.sql index 4daac4cce..7589128b4 100644 --- a/src/test/regress/sql/multi_subquery_union.sql +++ b/src/test/regress/sql/multi_subquery_union.sql @@ -6,7 +6,6 @@ -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; -SET citus.subquery_pushdown TO true; SET citus.enable_router_execution TO false; -- a very simple union query SELECT user_id, counter @@ -673,5 +672,4 @@ FROM GROUP BY types ORDER BY types; -SET citus.subquery_pushdown TO false; SET citus.enable_router_execution TO true; diff --git a/src/test/regress/sql/multi_view.sql b/src/test/regress/sql/multi_view.sql index 164e5a48f..af7294334 100644 --- a/src/test/regress/sql/multi_view.sql +++ b/src/test/regress/sql/multi_view.sql @@ -123,17 +123,21 @@ SET citus.task_executor_type to DEFAULT; CREATE VIEW lineitems_by_shipping_method AS SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1; --- following will fail due to non-flattening of subquery due to GROUP BY +-- following will fail due to non GROUP BY of partition key SELECT * FROM lineitems_by_shipping_method; -- create a view with group by on partition column CREATE VIEW lineitems_by_orderkey AS - SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1; + SELECT + l_orderkey, count(*) + FROM + lineitem_hash_part + GROUP BY 1; --- this will also fail due to same reason -SELECT * FROM lineitems_by_orderkey; +-- this should work since we're able to push down this query +SELECT * FROM lineitems_by_orderkey ORDER BY 2 DESC, 1 ASC LIMIT 10; --- however it would work if it is made router plannable +-- it would also work since it is made router plannable SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100; DROP TABLE temp_lineitem CASCADE;