Change behaviour of subquery pushdown flag (#1315)

This commit changes the behaviour of the citus.subquery_pushdown flag.
Before this commit, the flag is used to enable subquery pushdown logic. But,
with this commit, that behaviour is enabled by default. In other words, the
flag is now useless. We prefer to keep the flag since we don't want to break
the backward compatibility. Also, we may consider using that flag for other
purposes in the next commits.
pull/1348/head
Onder Kalaci 2017-04-18 17:49:41 +03:00
parent d90770fb53
commit 40e0ec6ee5
25 changed files with 1204 additions and 1010 deletions

View File

@ -48,7 +48,6 @@
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/tqual.h" #include "utils/tqual.h"
@ -148,28 +147,6 @@ static bool TablePartitioningSupportsDistinct(List *tableNodeList,
Var *distinctColumn); Var *distinctColumn);
static bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column); static bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column);
/* Local functions forward declarations for subquery pushdown checks */
static void ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode,
PlannerRestrictionContext *
plannerRestrictionContext,
Query *originalQuery);
static void ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit);
static void ErrorIfUnsupportedSetOperation(Query *subqueryTree, bool outerQueryHasLimit);
static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationList);
static void ErrorIfUnsupportedTableCombination(Query *queryTree);
static bool TargetListOnPartitionColumn(Query *query, List *targetEntryList);
static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query);
static bool FullCompositeFieldList(List *compositeFieldList);
static void ErrorIfUnsupportedShardDistribution(Query *query);
static List * RelationIdList(Query *query);
static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction,
ShardInterval *firstInterval,
ShardInterval *secondInterval);
static void ErrorIfUnsupportedFilters(Query *subquery);
static bool EqualOpExpressionLists(List *firstOpExpressionList,
List *secondOpExpressionList);
/* Local functions forward declarations for limit clauses */ /* Local functions forward declarations for limit clauses */
static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode); static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode);
static List * WorkerSortClauseList(MultiExtendedOp *originalOpNode); static List * WorkerSortClauseList(MultiExtendedOp *originalOpNode);
@ -192,16 +169,9 @@ static bool HasOrderByHllType(List *sortClauseList, List *targetList);
* Third, the function pulls up the collect operators in the tree. Fourth, the * Third, the function pulls up the collect operators in the tree. Fourth, the
* function finds the extended operator node, and splits this node into master * function finds the extended operator node, and splits this node into master
* and worker extended operator nodes. * and worker extended operator nodes.
*
* We also pass plannerRestrictionContext and originalQuery to the optimizer.
* These are primarily used to decide whether the subquery is safe to pushdown.
* If not, it helps to produce meaningful error messages for subquery
* pushdown planning.
*/ */
void void
MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan, MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery)
{ {
bool hasOrderByHllType = false; bool hasOrderByHllType = false;
List *selectNodeList = NIL; List *selectNodeList = NIL;
@ -220,10 +190,6 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan,
/* check that we can optimize aggregates in the plan */ /* check that we can optimize aggregates in the plan */
ErrorIfContainsUnsupportedAggregate(logicalPlanNode); ErrorIfContainsUnsupportedAggregate(logicalPlanNode);
/* check that we can pushdown subquery in the plan */
ErrorIfContainsUnsupportedSubquery(logicalPlanNode, plannerRestrictionContext,
originalQuery);
/* /*
* If a select node exists, we use the idempower property to split the node * If a select node exists, we use the idempower property to split the node
* into two nodes that contain And and Or clauses. If both And and Or nodes * into two nodes that contain And and Or clauses. If both And and Or nodes
@ -2809,81 +2775,6 @@ GroupedByColumn(List *groupClauseList, List *targetList, Var *column)
} }
/*
* ErrorIfContainsUnsupportedSubquery extracts subquery multi table from the
* logical plan using plannerRestrictionContext and the original query. It uses
* some helper functions to check if we can push down subquery to worker nodes.
* These helper functions error out if we cannot push down the subquery.
*/
static void
ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode,
PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery)
{
Query *subquery = NULL;
List *extendedOpNodeList = NIL;
MultiTable *multiTable = NULL;
MultiExtendedOp *extendedOpNode = NULL;
bool outerQueryHasLimit = false;
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
/* check if logical plan includes a subquery */
List *subqueryMultiTableList = SubqueryMultiTableList(logicalPlanNode);
if (subqueryMultiTableList == NIL)
{
return;
}
/* currently in the planner we only allow one subquery in from-clause*/
Assert(list_length(subqueryMultiTableList) == 1);
/*
* We're checking two things here:
* (i) If the query contains a top level union, ensure that all leaves
* return the partition key at the same position
* (ii) Else, check whether all relations joined on the partition key or not
*/
if (ContainsUnionSubquery(originalQuery))
{
if (!SafeToPushdownUnionSubquery(relationRestrictionContext))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot pushdown the subquery since all leaves of "
"the UNION does not include partition key at the "
"same position"),
errdetail("Each leaf query of the UNION should return "
"partition key at the same position on its "
"target list.")));
}
}
else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot pushdown the subquery since all relations are not "
"joined using distribution keys"),
errdetail("Each relation should be joined with at least "
"one another relation using distribution keys and "
"equality operator.")));
}
multiTable = (MultiTable *) linitial(subqueryMultiTableList);
subquery = multiTable->subquery;
extendedOpNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp);
extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList);
if (extendedOpNode->limitCount)
{
outerQueryHasLimit = true;
}
ErrorIfCannotPushdownSubquery(subquery, outerQueryHasLimit);
ErrorIfUnsupportedShardDistribution(subquery);
ErrorIfUnsupportedFilters(subquery);
}
/* /*
* SubqueryMultiTableList extracts multi tables in the given logical plan tree * SubqueryMultiTableList extracts multi tables in the given logical plan tree
* and returns subquery multi tables in a new list. * and returns subquery multi tables in a new list.
@ -2910,290 +2801,6 @@ SubqueryMultiTableList(MultiNode *multiNode)
} }
/*
* ErrorIfCannotPushdownSubquery recursively checks if we can push down the given
* subquery to worker nodes. If we cannot push down the subquery, this function
* errors out.
*
* We can push down a subquery if it follows rules below. We support nested queries
* as long as they follow the same rules, and we recurse to validate each subquery
* for this given query.
* a. If there is an aggregate, it must be grouped on partition column.
* b. If there is a join, it must be between two regular tables or two subqueries.
* We don't support join between a regular table and a subquery. And columns on
* the join condition must be partition columns.
* c. If there is a distinct clause, it must be on the partition column.
*
* This function is very similar to ErrorIfQueryNotSupported() in logical
* planner, but we don't reuse it, because differently for subqueries we support
* a subset of distinct, union and left joins.
*
* Note that this list of checks is not exhaustive, there can be some cases
* which we let subquery to run but returned results would be wrong. Such as if
* a subquery has a group by on another subquery which includes order by with
* limit, we let this query to run, but results could be wrong depending on the
* features of underlying tables.
*/
static void
ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit)
{
bool preconditionsSatisfied = true;
char *errorDetail = NULL;
List *subqueryEntryList = NIL;
ListCell *rangeTableEntryCell = NULL;
ErrorIfUnsupportedTableCombination(subqueryTree);
if (subqueryTree->hasSubLinks)
{
preconditionsSatisfied = false;
errorDetail = "Subqueries other than from-clause subqueries are unsupported";
}
if (subqueryTree->rtable == NIL)
{
preconditionsSatisfied = false;
errorDetail = "Subqueries without relations are unsupported";
}
if (subqueryTree->hasWindowFuncs)
{
preconditionsSatisfied = false;
errorDetail = "Window functions are currently unsupported";
}
if (subqueryTree->limitOffset)
{
preconditionsSatisfied = false;
errorDetail = "Offset clause is currently unsupported";
}
if (subqueryTree->limitCount && !outerQueryHasLimit)
{
preconditionsSatisfied = false;
errorDetail = "Limit in subquery without limit in the outer query is unsupported";
}
if (subqueryTree->setOperations)
{
ErrorIfUnsupportedSetOperation(subqueryTree, outerQueryHasLimit);
}
if (subqueryTree->hasRecursive)
{
preconditionsSatisfied = false;
errorDetail = "Recursive queries are currently unsupported";
}
if (subqueryTree->cteList)
{
preconditionsSatisfied = false;
errorDetail = "Common Table Expressions are currently unsupported";
}
if (subqueryTree->hasForUpdate)
{
preconditionsSatisfied = false;
errorDetail = "For Update/Share commands are currently unsupported";
}
/* group clause list must include partition column */
if (subqueryTree->groupClause)
{
List *groupClauseList = subqueryTree->groupClause;
List *targetEntryList = subqueryTree->targetList;
List *groupTargetEntryList = GroupTargetEntryList(groupClauseList,
targetEntryList);
bool groupOnPartitionColumn = TargetListOnPartitionColumn(subqueryTree,
groupTargetEntryList);
if (!groupOnPartitionColumn)
{
preconditionsSatisfied = false;
errorDetail = "Group by list without partition column is currently "
"unsupported";
}
}
/* we don't support aggregates without group by */
if (subqueryTree->hasAggs && (subqueryTree->groupClause == NULL))
{
preconditionsSatisfied = false;
errorDetail = "Aggregates without group by are currently unsupported";
}
/* having clause without group by on partition column is not supported */
if (subqueryTree->havingQual && (subqueryTree->groupClause == NULL))
{
preconditionsSatisfied = false;
errorDetail = "Having qual without group by on partition column is "
"currently unsupported";
}
/* distinct clause list must include partition column */
if (subqueryTree->distinctClause)
{
List *distinctClauseList = subqueryTree->distinctClause;
List *targetEntryList = subqueryTree->targetList;
List *distinctTargetEntryList = GroupTargetEntryList(distinctClauseList,
targetEntryList);
bool distinctOnPartitionColumn =
TargetListOnPartitionColumn(subqueryTree, distinctTargetEntryList);
if (!distinctOnPartitionColumn)
{
preconditionsSatisfied = false;
errorDetail = "Distinct on columns without partition column is "
"currently unsupported";
}
}
/* finally check and error out if not satisfied */
if (!preconditionsSatisfied)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("%s", errorDetail)));
}
/* recursively do same check for subqueries of this query */
subqueryEntryList = SubqueryEntryList(subqueryTree);
foreach(rangeTableEntryCell, subqueryEntryList)
{
RangeTblEntry *rangeTableEntry =
(RangeTblEntry *) lfirst(rangeTableEntryCell);
Query *innerSubquery = rangeTableEntry->subquery;
ErrorIfCannotPushdownSubquery(innerSubquery, outerQueryHasLimit);
}
}
/*
* ErrorIfUnsupportedSetOperation is a helper function for ErrorIfCannotPushdownSubquery().
* It basically iterates over the subqueries that reside under the given set operations.
*
* The function also errors out for set operations INTERSECT and EXCEPT.
*/
static void
ErrorIfUnsupportedSetOperation(Query *subqueryTree, bool outerQueryHasLimit)
{
List *rangeTableList = subqueryTree->rtable;
List *rangeTableIndexList = NIL;
ListCell *rangeTableIndexCell = NULL;
List *setOperationStatementList = NIL;
ListCell *setOperationStatmentCell = NULL;
ExtractSetOperationStatmentWalker((Node *) subqueryTree->setOperations,
&setOperationStatementList);
foreach(setOperationStatmentCell, setOperationStatementList)
{
SetOperationStmt *setOperation =
(SetOperationStmt *) lfirst(setOperationStatmentCell);
if (setOperation->op != SETOP_UNION)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("Intersect and Except are currently unsupported")));
}
}
ExtractRangeTableIndexWalker((Node *) subqueryTree->setOperations,
&rangeTableIndexList);
foreach(rangeTableIndexCell, rangeTableIndexList)
{
int rangeTableIndex = lfirst_int(rangeTableIndexCell);
RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableIndex, rangeTableList);
Assert(rangeTableEntry->rtekind == RTE_SUBQUERY);
ErrorIfCannotPushdownSubquery(rangeTableEntry->subquery, outerQueryHasLimit);
}
}
/*
* ExtractSetOperationStatementWalker walks over a set operations statment,
* and finds all set operations in the tree.
*/
static bool
ExtractSetOperationStatmentWalker(Node *node, List **setOperationList)
{
bool walkerResult = false;
if (node == NULL)
{
return false;
}
if (IsA(node, SetOperationStmt))
{
SetOperationStmt *setOperation = (SetOperationStmt *) node;
(*setOperationList) = lappend(*setOperationList, setOperation);
}
walkerResult = expression_tree_walker(node, ExtractSetOperationStatmentWalker,
setOperationList);
return walkerResult;
}
/*
* ErrorIfUnsupportedTableCombination checks if the given query tree contains any
* unsupported range table combinations. For this, the function walks over all
* range tables in the join tree, and checks if they correspond to simple relations
* or subqueries.
*/
static void
ErrorIfUnsupportedTableCombination(Query *queryTree)
{
List *rangeTableList = queryTree->rtable;
List *joinTreeTableIndexList = NIL;
ListCell *joinTreeTableIndexCell = NULL;
bool unsupporteTableCombination = false;
char *errorDetail = NULL;
/*
* Extract all range table indexes from the join tree. Note that sub-queries
* that get pulled up by PostgreSQL don't appear in this join tree.
*/
ExtractRangeTableIndexWalker((Node *) queryTree->jointree, &joinTreeTableIndexList);
foreach(joinTreeTableIndexCell, joinTreeTableIndexList)
{
/*
* Join tree's range table index starts from 1 in the query tree. But,
* list indexes start from 0.
*/
int joinTreeTableIndex = lfirst_int(joinTreeTableIndexCell);
int rangeTableListIndex = joinTreeTableIndex - 1;
RangeTblEntry *rangeTableEntry =
(RangeTblEntry *) list_nth(rangeTableList, rangeTableListIndex);
/*
* Check if the range table in the join tree is a simple relation or a
* subquery.
*/
if (rangeTableEntry->rtekind != RTE_RELATION &&
rangeTableEntry->rtekind != RTE_SUBQUERY)
{
unsupporteTableCombination = true;
errorDetail = "Table expressions other than simple relations and "
"subqueries are currently unsupported";
break;
}
}
/* finally check and error out if not satisfied */
if (unsupporteTableCombination)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("%s", errorDetail)));
}
}
/* /*
* GroupTargetEntryList walks over group clauses in the given list, finds * GroupTargetEntryList walks over group clauses in the given list, finds
* matching target entries and return them in a new list. * matching target entries and return them in a new list.
@ -3216,53 +2823,6 @@ GroupTargetEntryList(List *groupClauseList, List *targetEntryList)
} }
/*
* TargetListOnPartitionColumn checks if at least one target list entry is on
* partition column.
*/
static bool
TargetListOnPartitionColumn(Query *query, List *targetEntryList)
{
bool targetListOnPartitionColumn = false;
List *compositeFieldList = NIL;
ListCell *targetEntryCell = NULL;
foreach(targetEntryCell, targetEntryList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
Expr *targetExpression = targetEntry->expr;
bool isPartitionColumn = IsPartitionColumn(targetExpression, query);
if (isPartitionColumn)
{
FieldSelect *compositeField = CompositeFieldRecursive(targetExpression,
query);
if (compositeField)
{
compositeFieldList = lappend(compositeFieldList, compositeField);
}
else
{
targetListOnPartitionColumn = true;
break;
}
}
}
/* check composite fields */
if (!targetListOnPartitionColumn)
{
bool fullCompositeFieldList = FullCompositeFieldList(compositeFieldList);
if (fullCompositeFieldList)
{
targetListOnPartitionColumn = true;
}
}
return targetListOnPartitionColumn;
}
/* /*
* IsPartitionColumn returns true if the given column is a partition column. * IsPartitionColumn returns true if the given column is a partition column.
* The function uses FindReferencedTableColumn to find the original relation * The function uses FindReferencedTableColumn to find the original relation
@ -3416,265 +2976,9 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
} }
/*
* CompositeFieldRecursive recursively finds composite field in the query tree
* referred by given expression. If expression does not refer to a composite
* field, then it returns NULL.
*
* If expression is a field select we directly return composite field. If it is
* a column is referenced from a subquery, then we recursively check that subquery
* until we reach the source of that column, and find composite field. If this
* column is referenced from join range table entry, then we resolve which join
* column it refers and recursively use this column with the same query.
*/
static FieldSelect *
CompositeFieldRecursive(Expr *expression, Query *query)
{
FieldSelect *compositeField = NULL;
List *rangetableList = query->rtable;
Index rangeTableEntryIndex = 0;
RangeTblEntry *rangeTableEntry = NULL;
Var *candidateColumn = NULL;
if (IsA(expression, FieldSelect))
{
compositeField = (FieldSelect *) expression;
return compositeField;
}
if (IsA(expression, Var))
{
candidateColumn = (Var *) expression;
}
else
{
return NULL;
}
rangeTableEntryIndex = candidateColumn->varno - 1;
rangeTableEntry = list_nth(rangetableList, rangeTableEntryIndex);
if (rangeTableEntry->rtekind == RTE_SUBQUERY)
{
Query *subquery = rangeTableEntry->subquery;
List *targetEntryList = subquery->targetList;
AttrNumber targetEntryIndex = candidateColumn->varattno - 1;
TargetEntry *subqueryTargetEntry = list_nth(targetEntryList, targetEntryIndex);
Expr *subqueryExpression = subqueryTargetEntry->expr;
compositeField = CompositeFieldRecursive(subqueryExpression, subquery);
}
else if (rangeTableEntry->rtekind == RTE_JOIN)
{
List *joinColumnList = rangeTableEntry->joinaliasvars;
AttrNumber joinColumnIndex = candidateColumn->varattno - 1;
Expr *joinColumn = list_nth(joinColumnList, joinColumnIndex);
compositeField = CompositeFieldRecursive(joinColumn, query);
}
return compositeField;
}
/*
* FullCompositeFieldList gets a composite field list, and checks if all fields
* of composite type are used in the list.
*/
static bool
FullCompositeFieldList(List *compositeFieldList)
{
bool fullCompositeFieldList = true;
bool *compositeFieldArray = NULL;
uint32 compositeFieldCount = 0;
uint32 fieldIndex = 0;
ListCell *fieldSelectCell = NULL;
foreach(fieldSelectCell, compositeFieldList)
{
FieldSelect *fieldSelect = (FieldSelect *) lfirst(fieldSelectCell);
uint32 compositeFieldIndex = 0;
Expr *fieldExpression = fieldSelect->arg;
if (!IsA(fieldExpression, Var))
{
continue;
}
if (compositeFieldArray == NULL)
{
uint32 index = 0;
Var *compositeColumn = (Var *) fieldExpression;
Oid compositeTypeId = compositeColumn->vartype;
Oid compositeRelationId = get_typ_typrelid(compositeTypeId);
/* get composite type attribute count */
Relation relation = relation_open(compositeRelationId, AccessShareLock);
compositeFieldCount = relation->rd_att->natts;
compositeFieldArray = palloc0(compositeFieldCount * sizeof(bool));
relation_close(relation, AccessShareLock);
for (index = 0; index < compositeFieldCount; index++)
{
compositeFieldArray[index] = false;
}
}
compositeFieldIndex = fieldSelect->fieldnum - 1;
compositeFieldArray[compositeFieldIndex] = true;
}
for (fieldIndex = 0; fieldIndex < compositeFieldCount; fieldIndex++)
{
if (!compositeFieldArray[fieldIndex])
{
fullCompositeFieldList = false;
}
}
if (compositeFieldCount == 0)
{
fullCompositeFieldList = false;
}
return fullCompositeFieldList;
}
/*
* ErrorIfUnsupportedShardDistribution gets list of relations in the given query
* and checks if two conditions below hold for them, otherwise it errors out.
* a. Every relation is distributed by range or hash. This means shards are
* disjoint based on the partition column.
* b. All relations have 1-to-1 shard partitioning between them. This means
* shard count for every relation is same and for every shard in a relation
* there is exactly one shard in other relations with same min/max values.
*/
static void
ErrorIfUnsupportedShardDistribution(Query *query)
{
Oid firstTableRelationId = InvalidOid;
List *relationIdList = RelationIdList(query);
ListCell *relationIdCell = NULL;
uint32 relationIndex = 0;
uint32 rangeDistributedRelationCount = 0;
uint32 hashDistributedRelationCount = 0;
foreach(relationIdCell, relationIdList)
{
Oid relationId = lfirst_oid(relationIdCell);
char partitionMethod = PartitionMethod(relationId);
if (partitionMethod == DISTRIBUTE_BY_RANGE)
{
rangeDistributedRelationCount++;
}
else if (partitionMethod == DISTRIBUTE_BY_HASH)
{
hashDistributedRelationCount++;
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("Currently range and hash partitioned "
"relations are supported")));
}
}
if ((rangeDistributedRelationCount > 0) && (hashDistributedRelationCount > 0))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("A query including both range and hash "
"partitioned relations are unsupported")));
}
foreach(relationIdCell, relationIdList)
{
Oid relationId = lfirst_oid(relationIdCell);
bool coPartitionedTables = false;
Oid currentRelationId = relationId;
/* get shard list of first relation and continue for the next relation */
if (relationIndex == 0)
{
firstTableRelationId = relationId;
relationIndex++;
continue;
}
/* check if this table has 1-1 shard partitioning with first table */
coPartitionedTables = CoPartitionedTables(firstTableRelationId,
currentRelationId);
if (!coPartitionedTables)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("Shards of relations in subquery need to "
"have 1-to-1 shard partitioning")));
}
}
}
/*
* RelationIdList returns list of unique relation ids in query tree.
*/
static List *
RelationIdList(Query *query)
{
List *rangeTableList = NIL;
List *tableEntryList = NIL;
List *relationIdList = NIL;
ListCell *tableEntryCell = NULL;
ExtractRangeTableRelationWalker((Node *) query, &rangeTableList);
tableEntryList = TableEntryList(rangeTableList);
foreach(tableEntryCell, tableEntryList)
{
TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell);
Oid relationId = tableEntry->relationId;
relationIdList = list_append_unique_oid(relationIdList, relationId);
}
return relationIdList;
}
/* /*
* CoPartitionedTables checks if given two distributed tables have 1-to-1 * CoPartitionedTables checks if given two distributed tables have 1-to-1
* shard partitioning. * shard partitioning.
*/
static bool
CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
{
bool coPartitionedTables = true;
uint32 intervalIndex = 0;
DistTableCacheEntry *firstTableCache = DistributedTableCacheEntry(firstRelationId);
DistTableCacheEntry *secondTableCache = DistributedTableCacheEntry(secondRelationId);
ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray;
ShardInterval **sortedSecondIntervalArray =
secondTableCache->sortedShardIntervalArray;
uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength;
uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength;
FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction;
if (firstListShardCount != secondListShardCount)
{
return false;
}
/* if there are not any shards just return true */
if (firstListShardCount == 0)
{
return true;
}
Assert(comparisonFunction != NULL);
/* /*
* Check if the tables have the same colocation ID - if so, we know * Check if the tables have the same colocation ID - if so, we know
* they're colocated. * they're colocated.
@ -3691,126 +2995,6 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
* interval minimum values. Then it compares every shard interval in order * interval minimum values. Then it compares every shard interval in order
* and if any pair of shard intervals are not equal it returns false. * and if any pair of shard intervals are not equal it returns false.
*/ */
for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++)
{
ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex];
ShardInterval *secondInterval = sortedSecondIntervalArray[intervalIndex];
bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction,
firstInterval,
secondInterval);
if (!shardIntervalsEqual)
{
coPartitionedTables = false;
break;
}
}
return coPartitionedTables;
}
/*
* ShardIntervalsEqual checks if given shard intervals have equal min/max values.
*/
static bool
ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval,
ShardInterval *secondInterval)
{
bool shardIntervalsEqual = false;
Datum firstMin = 0;
Datum firstMax = 0;
Datum secondMin = 0;
Datum secondMax = 0;
firstMin = firstInterval->minValue;
firstMax = firstInterval->maxValue;
secondMin = secondInterval->minValue;
secondMax = secondInterval->maxValue;
if (firstInterval->minValueExists && firstInterval->maxValueExists &&
secondInterval->minValueExists && secondInterval->maxValueExists)
{
Datum minDatum = CompareCall2(comparisonFunction, firstMin, secondMin);
Datum maxDatum = CompareCall2(comparisonFunction, firstMax, secondMax);
int firstComparison = DatumGetInt32(minDatum);
int secondComparison = DatumGetInt32(maxDatum);
if (firstComparison == 0 && secondComparison == 0)
{
shardIntervalsEqual = true;
}
}
return shardIntervalsEqual;
}
/*
* ErrorIfUnsupportedFilters checks if all leaf queries in the given query have
* same filter on the partition column. Note that if there are queries without
* any filter on the partition column, they don't break this prerequisite.
*/
static void
ErrorIfUnsupportedFilters(Query *subquery)
{
List *queryList = NIL;
ListCell *queryCell = NULL;
List *subqueryOpExpressionList = NIL;
List *relationIdList = RelationIdList(subquery);
/*
* Get relation id of any relation in the subquery and create partiton column
* for this relation. We will use this column to replace columns on operator
* expressions on different tables. Then we compare these operator expressions
* to see if they consist of same operator and constant value.
*/
Oid relationId = linitial_oid(relationIdList);
Var *partitionColumn = PartitionColumn(relationId, 0);
ExtractQueryWalker((Node *) subquery, &queryList);
foreach(queryCell, queryList)
{
Query *query = (Query *) lfirst(queryCell);
List *opExpressionList = NIL;
List *newOpExpressionList = NIL;
bool leafQuery = LeafQuery(query);
if (!leafQuery)
{
continue;
}
opExpressionList = PartitionColumnOpExpressionList(query);
if (opExpressionList == NIL)
{
continue;
}
newOpExpressionList = ReplaceColumnsInOpExpressionList(opExpressionList,
partitionColumn);
if (subqueryOpExpressionList == NIL)
{
subqueryOpExpressionList = newOpExpressionList;
}
else
{
bool equalOpExpressionLists = EqualOpExpressionLists(subqueryOpExpressionList,
newOpExpressionList);
if (!equalOpExpressionLists)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("Currently all leaf queries need to "
"have same filters on partition column")));
}
}
}
}
/*
* ExtractQueryWalker walks over a query, and finds all queries in the query * ExtractQueryWalker walks over a query, and finds all queries in the query
* tree and returns these queries. * tree and returns these queries.
*/ */
@ -4008,51 +3192,6 @@ ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn)
} }
/*
* EqualOpExpressionLists checks if given two operator expression lists are
* equal.
*/
static bool
EqualOpExpressionLists(List *firstOpExpressionList, List *secondOpExpressionList)
{
bool equalOpExpressionLists = false;
ListCell *firstOpExpressionCell = NULL;
uint32 equalOpExpressionCount = 0;
uint32 firstOpExpressionCount = list_length(firstOpExpressionList);
uint32 secondOpExpressionCount = list_length(secondOpExpressionList);
if (firstOpExpressionCount != secondOpExpressionCount)
{
return false;
}
foreach(firstOpExpressionCell, firstOpExpressionList)
{
OpExpr *firstOpExpression = (OpExpr *) lfirst(firstOpExpressionCell);
ListCell *secondOpExpressionCell = NULL;
foreach(secondOpExpressionCell, secondOpExpressionList)
{
OpExpr *secondOpExpression = (OpExpr *) lfirst(secondOpExpressionCell);
bool equalExpressions = equal(firstOpExpression, secondOpExpression);
if (equalExpressions)
{
equalOpExpressionCount++;
continue;
}
}
}
if (equalOpExpressionCount == firstOpExpressionCount)
{
equalOpExpressionLists = true;
}
return equalOpExpressionLists;
}
/* /*
* WorkerLimitCount checks if the given extended node contains a limit node, and * WorkerLimitCount checks if the given extended node contains a limit node, and
* if that node can be pushed down. For this, the function checks if this limit * if that node can be pushed down. For this, the function checks if this limit

File diff suppressed because it is too large Load Diff

View File

@ -124,6 +124,11 @@ static Job * BuildJobTreeTaskList(Job *jobTree,
PlannerRestrictionContext *plannerRestrictionContext); PlannerRestrictionContext *plannerRestrictionContext);
static List * SubquerySqlTaskList(Job *job, static List * SubquerySqlTaskList(Job *job,
PlannerRestrictionContext *plannerRestrictionContext); PlannerRestrictionContext *plannerRestrictionContext);
static void ErrorIfUnsupportedShardDistribution(Query *query);
static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction,
ShardInterval *firstInterval,
ShardInterval *secondInterval);
static Task * SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, static Task * SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
RelationRestrictionContext *restrictionContext, RelationRestrictionContext *restrictionContext,
uint32 taskId); uint32 taskId);
@ -2031,6 +2036,9 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
RelationRestrictionContext *relationRestrictionContext = RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext; plannerRestrictionContext->relationRestrictionContext;
/* error if shards are not co-partitioned */
ErrorIfUnsupportedShardDistribution(subquery);
/* get list of all range tables in subquery tree */ /* get list of all range tables in subquery tree */
ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList); ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList);
@ -2082,6 +2090,171 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
} }
/*
* ErrorIfUnsupportedShardDistribution gets list of relations in the given query
* and checks if two conditions below hold for them, otherwise it errors out.
* a. Every relation is distributed by range or hash. This means shards are
* disjoint based on the partition column.
* b. All relations have 1-to-1 shard partitioning between them. This means
* shard count for every relation is same and for every shard in a relation
* there is exactly one shard in other relations with same min/max values.
*/
static void
ErrorIfUnsupportedShardDistribution(Query *query)
{
Oid firstTableRelationId = InvalidOid;
List *relationIdList = RelationIdList(query);
ListCell *relationIdCell = NULL;
uint32 relationIndex = 0;
uint32 rangeDistributedRelationCount = 0;
uint32 hashDistributedRelationCount = 0;
foreach(relationIdCell, relationIdList)
{
Oid relationId = lfirst_oid(relationIdCell);
char partitionMethod = PartitionMethod(relationId);
if (partitionMethod == DISTRIBUTE_BY_RANGE)
{
rangeDistributedRelationCount++;
}
else if (partitionMethod == DISTRIBUTE_BY_HASH)
{
hashDistributedRelationCount++;
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("Currently range and hash partitioned "
"relations are supported")));
}
}
if ((rangeDistributedRelationCount > 0) && (hashDistributedRelationCount > 0))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("A query including both range and hash "
"partitioned relations are unsupported")));
}
foreach(relationIdCell, relationIdList)
{
Oid relationId = lfirst_oid(relationIdCell);
bool coPartitionedTables = false;
Oid currentRelationId = relationId;
/* get shard list of first relation and continue for the next relation */
if (relationIndex == 0)
{
firstTableRelationId = relationId;
relationIndex++;
continue;
}
/* check if this table has 1-1 shard partitioning with first table */
coPartitionedTables = CoPartitionedTables(firstTableRelationId,
currentRelationId);
if (!coPartitionedTables)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("Shards of relations in subquery need to "
"have 1-to-1 shard partitioning")));
}
}
}
/*
* CoPartitionedTables checks if given two distributed tables have 1-to-1 shard
* partitioning. It uses shard interval array that are sorted on interval minimum
* values. Then it compares every shard interval in order and if any pair of
* shard intervals are not equal it returns false.
*/
static bool
CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
{
bool coPartitionedTables = true;
uint32 intervalIndex = 0;
DistTableCacheEntry *firstTableCache = DistributedTableCacheEntry(firstRelationId);
DistTableCacheEntry *secondTableCache = DistributedTableCacheEntry(secondRelationId);
ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray;
ShardInterval **sortedSecondIntervalArray =
secondTableCache->sortedShardIntervalArray;
uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength;
uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength;
FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction;
if (firstListShardCount != secondListShardCount)
{
return false;
}
/* if there are not any shards just return true */
if (firstListShardCount == 0)
{
return true;
}
Assert(comparisonFunction != NULL);
for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++)
{
ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex];
ShardInterval *secondInterval = sortedSecondIntervalArray[intervalIndex];
bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction,
firstInterval,
secondInterval);
if (!shardIntervalsEqual)
{
coPartitionedTables = false;
break;
}
}
return coPartitionedTables;
}
/*
* ShardIntervalsEqual checks if given shard intervals have equal min/max values.
*/
static bool
ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval,
ShardInterval *secondInterval)
{
bool shardIntervalsEqual = false;
Datum firstMin = 0;
Datum firstMax = 0;
Datum secondMin = 0;
Datum secondMax = 0;
firstMin = firstInterval->minValue;
firstMax = firstInterval->maxValue;
secondMin = secondInterval->minValue;
secondMax = secondInterval->maxValue;
if (firstInterval->minValueExists && firstInterval->maxValueExists &&
secondInterval->minValueExists && secondInterval->maxValueExists)
{
Datum minDatum = CompareCall2(comparisonFunction, firstMin, secondMin);
Datum maxDatum = CompareCall2(comparisonFunction, firstMax, secondMax);
int firstComparison = DatumGetInt32(minDatum);
int secondComparison = DatumGetInt32(maxDatum);
if (firstComparison == 0 && secondComparison == 0)
{
shardIntervalsEqual = true;
}
}
return shardIntervalsEqual;
}
/* /*
* SubqueryTaskCreate creates a sql task by replacing the target * SubqueryTaskCreate creates a sql task by replacing the target
* shardInterval's boundary value.. Then performs the normal * shardInterval's boundary value.. Then performs the normal

View File

@ -310,10 +310,9 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
*/ */
if ((!distributedPlan || distributedPlan->planningError) && !hasUnresolvedParams) if ((!distributedPlan || distributedPlan->planningError) && !hasUnresolvedParams)
{ {
/* Create and optimize logical plan */ MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query,
MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query); plannerRestrictionContext);
MultiLogicalPlanOptimize(logicalPlan, plannerRestrictionContext, MultiLogicalPlanOptimize(logicalPlan);
originalQuery);
/* /*
* This check is here to make it likely that all node types used in * This check is here to make it likely that all node types used in

View File

@ -1366,3 +1366,29 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction)
return InvalidAttrNumber; return InvalidAttrNumber;
} }
/*
* RelationIdList returns list of unique relation ids in query tree.
*/
List *
RelationIdList(Query *query)
{
List *rangeTableList = NIL;
List *tableEntryList = NIL;
List *relationIdList = NIL;
ListCell *tableEntryCell = NULL;
ExtractRangeTableRelationWalker((Node *) query, &rangeTableList);
tableEntryList = TableEntryList(rangeTableList);
foreach(tableEntryCell, tableEntryList)
{
TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell);
Oid relationId = tableEntry->relationId;
relationIdList = list_append_unique_oid(relationIdList, relationId);
}
return relationIdList;
}

View File

@ -107,9 +107,7 @@ extern double CountDistinctErrorRate;
/* Function declaration for optimizing logical plans */ /* Function declaration for optimizing logical plans */
extern void MultiLogicalPlanOptimize(MultiTreeRoot *multiTree, extern void MultiLogicalPlanOptimize(MultiTreeRoot *multiTree);
PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery);
/* Function declaration for getting partition method for the given relation */ /* Function declaration for getting partition method for the given relation */
extern char PartitionMethod(Oid relationId); extern char PartitionMethod(Oid relationId);

View File

@ -16,6 +16,7 @@
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/relation_restriction_equivalence.h"
#include "nodes/nodes.h" #include "nodes/nodes.h"
#include "nodes/primnodes.h" #include "nodes/primnodes.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
@ -180,7 +181,9 @@ extern bool SubqueryPushdown;
/* Function declarations for building logical plans */ /* Function declarations for building logical plans */
extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree); extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree,
PlannerRestrictionContext *
plannerRestrictionContext);
extern bool NeedsDistributedPlanning(Query *queryTree); extern bool NeedsDistributedPlanning(Query *queryTree);
extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ParentNode(MultiNode *multiNode);
extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode);

View File

@ -19,6 +19,7 @@ extern bool ContainsUnionSubquery(Query *queryTree);
extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext * extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern bool SafeToPushdownUnionSubquery(RelationRestrictionContext *restrictionContext); extern bool SafeToPushdownUnionSubquery(RelationRestrictionContext *restrictionContext);
extern List * RelationIdList(Query *query);
#endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */

View File

@ -348,16 +348,14 @@ ERROR: cannot perform local joins that involve expressions
DETAIL: local joins can be performed between columns only DETAIL: local joins can be performed between columns only
-- Check that we can issue limit/offset queries -- Check that we can issue limit/offset queries
-- OFFSET in subqueries are not supported -- OFFSET in subqueries are not supported
-- Error in the planner when subquery pushdown is off -- Error in the planner when single repartition subquery
SELECT * FROM (SELECT o_custkey FROM orders GROUP BY o_custkey ORDER BY o_custkey OFFSET 20) sq;
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries with offset are not supported yet
-- Error in the optimizer when subquery pushdown is on
SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq;
ERROR: cannot perform distributed planning on this query ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries with offset are not supported yet DETAIL: Subqueries with offset are not supported yet
SET citus.subquery_pushdown TO true;
-- Error in the optimizer when subquery pushdown is on
SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq;
ERROR: cannot push down this subquery
DETAIL: Offset clause is currently unsupported
SET citus.subquery_pushdown TO false;
-- Simple LIMIT/OFFSET with ORDER BY -- Simple LIMIT/OFFSET with ORDER BY
SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20; SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20;
o_orderkey o_orderkey

View File

@ -802,8 +802,8 @@ SELECT * FROM (
(SELECT * FROM articles_hash_mx WHERE author_id = 1) (SELECT * FROM articles_hash_mx WHERE author_id = 1)
UNION UNION
(SELECT * FROM articles_hash_mx WHERE author_id = 2)) uu; (SELECT * FROM articles_hash_mx WHERE author_id = 2)) uu;
ERROR: cannot perform distributed planning on this query ERROR: cannot push down this subquery
DETAIL: Subqueries without group by clause are not supported yet DETAIL: Currently all leaf queries need to have same filters on partition column
-- error out for queries with repartition jobs -- error out for queries with repartition jobs
SELECT * SELECT *
FROM articles_hash_mx a, articles_hash_mx b FROM articles_hash_mx a, articles_hash_mx b

View File

@ -916,8 +916,8 @@ SELECT * FROM (
(SELECT * FROM articles_hash WHERE author_id = 1) (SELECT * FROM articles_hash WHERE author_id = 1)
UNION UNION
(SELECT * FROM articles_hash WHERE author_id = 2)) uu; (SELECT * FROM articles_hash WHERE author_id = 2)) uu;
ERROR: cannot perform distributed planning on this query ERROR: cannot push down this subquery
DETAIL: Subqueries without group by clause are not supported yet DETAIL: Currently all leaf queries need to have same filters on partition column
-- error out for queries with repartition jobs -- error out for queries with repartition jobs
SELECT * SELECT *
FROM articles_hash a, articles_hash b FROM articles_hash a, articles_hash b

View File

@ -6,7 +6,6 @@
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1430000; -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1430000;
-- ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1430000; -- ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1430000;
SET citus.subquery_pushdown TO TRUE;
SET citus.enable_router_execution TO FALSE; SET citus.enable_router_execution TO FALSE;
------------------------------------ ------------------------------------
-- Vanilla funnel query -- Vanilla funnel query
@ -1673,5 +1672,4 @@ WHERE b.user_id IS NULL
GROUP BY a.user_id; GROUP BY a.user_id;
ERROR: cannot push down this subquery ERROR: cannot push down this subquery
DETAIL: Table expressions other than simple relations and subqueries are currently unsupported DETAIL: Table expressions other than simple relations and subqueries are currently unsupported
SET citus.subquery_pushdown TO FALSE;
SET citus.enable_router_execution TO TRUE; SET citus.enable_router_execution TO TRUE;

View File

@ -7,7 +7,6 @@
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000;
SET citus.subquery_pushdown TO TRUE;
SET citus.enable_router_execution TO FALSE; SET citus.enable_router_execution TO FALSE;
-- --
-- UNIONs and JOINs mixed -- UNIONs and JOINs mixed
@ -2025,8 +2024,8 @@ FROM (
) user_id ) user_id
ORDER BY 1, ORDER BY 1,
2 limit 10; 2 limit 10;
ERROR: cannot push down this subquery ERROR: cannot perform distributed planning on this query
DETAIL: Window functions are currently unsupported DETAIL: Subqueries without group by clause are not supported yet
-- not supported due to non relation rte -- not supported due to non relation rte
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM FROM
@ -2113,5 +2112,4 @@ GROUP BY types
ORDER BY types; ORDER BY types;
ERROR: cannot push down this subquery ERROR: cannot push down this subquery
DETAIL: Subqueries without relations are unsupported DETAIL: Subqueries without relations are unsupported
SET citus.subquery_pushdown TO FALSE;
SET citus.enable_router_execution TO TRUE; SET citus.enable_router_execution TO TRUE;

View File

@ -4,7 +4,6 @@
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql -- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
SET citus.subquery_pushdown TO true;
SET citus.enable_router_execution TO false; SET citus.enable_router_execution TO false;
-- a very simple union query -- a very simple union query
SELECT user_id, counter SELECT user_id, counter
@ -845,5 +844,4 @@ GROUP BY types
ORDER BY types; ORDER BY types;
ERROR: cannot push down this subquery ERROR: cannot push down this subquery
DETAIL: Subqueries without relations are unsupported DETAIL: Subqueries without relations are unsupported
SET citus.subquery_pushdown TO false;
SET citus.enable_router_execution TO true; SET citus.enable_router_execution TO true;

View File

@ -234,16 +234,33 @@ SET citus.task_executor_type to DEFAULT;
-- create a view with aggregate -- create a view with aggregate
CREATE VIEW lineitems_by_shipping_method AS CREATE VIEW lineitems_by_shipping_method AS
SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1; SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1;
-- following will fail due to non-flattening of subquery due to GROUP BY -- following will fail due to non GROUP BY of partition key
SELECT * FROM lineitems_by_shipping_method; SELECT * FROM lineitems_by_shipping_method;
ERROR: Unrecognized range table id 1 ERROR: Unrecognized range table id 1
-- create a view with group by on partition column -- create a view with group by on partition column
CREATE VIEW lineitems_by_orderkey AS CREATE VIEW lineitems_by_orderkey AS
SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1; SELECT
-- this will also fail due to same reason l_orderkey, count(*)
SELECT * FROM lineitems_by_orderkey; FROM
ERROR: Unrecognized range table id 1 lineitem_hash_part
-- however it would work if it is made router plannable GROUP BY 1;
-- this should work since we're able to push down this query
SELECT * FROM lineitems_by_orderkey ORDER BY 2 DESC, 1 ASC LIMIT 10;
l_orderkey | count
------------+-------
7 | 7
68 | 7
129 | 7
164 | 7
194 | 7
225 | 7
226 | 7
322 | 7
326 | 7
354 | 7
(10 rows)
-- it would also work since it is made router plannable
SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100; SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100;
l_orderkey | count l_orderkey | count
------------+------- ------------+-------

View File

@ -130,9 +130,9 @@ SELECT *
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
l_orderkey, count(DISTINCT 1) l_linenumber, count(DISTINCT 1)
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey) sub GROUP BY l_linenumber) sub
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
@ -140,9 +140,9 @@ SELECT *
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
l_orderkey, count(DISTINCT (random() * 5)::int) l_linenumber, count(DISTINCT (random() * 5)::int)
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey) sub GROUP BY l_linenumber) sub
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
@ -277,18 +277,18 @@ SELECT *
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
l_orderkey, sum(DISTINCT l_partkey) l_linenumber, sum(DISTINCT l_partkey)
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey) sub GROUP BY l_linenumber) sub
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
l_orderkey, avg(DISTINCT l_partkey) l_linenumber, avg(DISTINCT l_partkey)
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey) sub GROUP BY l_linenumber) sub
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
@ -297,18 +297,18 @@ SELECT *
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
l_orderkey, count(DISTINCT lineitem_hash) l_linenumber, count(DISTINCT lineitem_hash)
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey) sub GROUP BY l_linenumber) sub
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
l_orderkey, count(DISTINCT lineitem_hash.*) l_linenumber, count(DISTINCT lineitem_hash.*)
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey) sub GROUP BY l_linenumber) sub
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;

View File

@ -46,7 +46,7 @@ SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range')
SET citus.enable_router_execution TO 'false'; SET citus.enable_router_execution TO 'false';
-- Check that we don't allow subquery pushdown in default settings. -- Check that we allow subquery pushdown in default settings.
SELECT SELECT
avg(unit_price) avg(unit_price)
@ -62,8 +62,6 @@ FROM
GROUP BY GROUP BY
l_orderkey) AS unit_prices; l_orderkey) AS unit_prices;
SET citus.subquery_pushdown to TRUE;
-- Check that we don't crash if there are not any shards. -- Check that we don't crash if there are not any shards.
SELECT SELECT
@ -130,7 +128,8 @@ FROM
UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003; UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003;
-- If group by is not on partition column then we error out. -- If group by is not on partition column then we error out from single table
-- repartition code path
SELECT SELECT
avg(order_count) avg(order_count)
@ -227,6 +226,8 @@ FROM (
count(*) a count(*) a
FROM FROM
lineitem_subquery lineitem_subquery
GROUP BY
l_orderkey
) z; ) z;
-- Check supported subquery types. -- Check supported subquery types.
@ -337,7 +338,6 @@ CREATE TABLE subquery_pruning_varchar_test_table
SELECT master_create_distributed_table('subquery_pruning_varchar_test_table', 'a', 'hash'); SELECT master_create_distributed_table('subquery_pruning_varchar_test_table', 'a', 'hash');
SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1); SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1);
SET citus.subquery_pushdown TO TRUE;
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;
SELECT * FROM SELECT * FROM
@ -925,8 +925,6 @@ WHERE shardid = :new_shard_id;
\. \.
SET citus.subquery_pushdown TO TRUE;
-- Simple join subquery pushdown -- Simple join subquery pushdown
SELECT SELECT
avg(array_length(events, 1)) AS event_average avg(array_length(events, 1)) AS event_average

View File

@ -241,9 +241,9 @@ SELECT *
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
l_orderkey, count(DISTINCT 1) l_linenumber, count(DISTINCT 1)
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey) sub GROUP BY l_linenumber) sub
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
ERROR: cannot compute aggregate (distinct) ERROR: cannot compute aggregate (distinct)
@ -253,9 +253,9 @@ HINT: You can load the hll extension from contrib packages and enable distinct
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
l_orderkey, count(DISTINCT (random() * 5)::int) l_linenumber, count(DISTINCT (random() * 5)::int)
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey) sub GROUP BY l_linenumber) sub
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
ERROR: cannot compute aggregate (distinct) ERROR: cannot compute aggregate (distinct)
@ -464,9 +464,9 @@ SELECT *
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
l_orderkey, sum(DISTINCT l_partkey) l_linenumber, sum(DISTINCT l_partkey)
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey) sub GROUP BY l_linenumber) sub
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
ERROR: cannot compute aggregate (distinct) ERROR: cannot compute aggregate (distinct)
@ -474,9 +474,9 @@ DETAIL: Only count(distinct) aggregate is supported in subqueries
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
l_orderkey, avg(DISTINCT l_partkey) l_linenumber, avg(DISTINCT l_partkey)
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey) sub GROUP BY l_linenumber) sub
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
ERROR: cannot compute aggregate (distinct) ERROR: cannot compute aggregate (distinct)
@ -486,9 +486,9 @@ DETAIL: Only count(distinct) aggregate is supported in subqueries
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
l_orderkey, count(DISTINCT lineitem_hash) l_linenumber, count(DISTINCT lineitem_hash)
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey) sub GROUP BY l_linenumber) sub
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
ERROR: cannot compute count (distinct) ERROR: cannot compute count (distinct)
@ -496,9 +496,9 @@ DETAIL: Non-column references are not supported yet
SELECT * SELECT *
FROM ( FROM (
SELECT SELECT
l_orderkey, count(DISTINCT lineitem_hash.*) l_linenumber, count(DISTINCT lineitem_hash.*)
FROM lineitem_hash FROM lineitem_hash
GROUP BY l_orderkey) sub GROUP BY l_linenumber) sub
ORDER BY 2 DESC, 1 DESC ORDER BY 2 DESC, 1 DESC
LIMIT 10; LIMIT 10;
ERROR: cannot compute count (distinct) ERROR: cannot compute count (distinct)

View File

@ -53,7 +53,7 @@ SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range')
(1 row) (1 row)
SET citus.enable_router_execution TO 'false'; SET citus.enable_router_execution TO 'false';
-- Check that we don't allow subquery pushdown in default settings. -- Check that we allow subquery pushdown in default settings.
SELECT SELECT
avg(unit_price) avg(unit_price)
FROM FROM
@ -67,9 +67,11 @@ FROM
l_orderkey = o_orderkey l_orderkey = o_orderkey
GROUP BY GROUP BY
l_orderkey) AS unit_prices; l_orderkey) AS unit_prices;
ERROR: cannot perform distributed planning on this query avg
DETAIL: Join in subqueries is not supported yet -----
SET citus.subquery_pushdown to TRUE;
(1 row)
-- Check that we don't crash if there are not any shards. -- Check that we don't crash if there are not any shards.
SELECT SELECT
avg(unit_price) avg(unit_price)
@ -129,7 +131,8 @@ ERROR: cannot push down this subquery
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
-- Update metadata in order to make all shards equal. -- Update metadata in order to make all shards equal.
UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003; UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003;
-- If group by is not on partition column then we error out. -- If group by is not on partition column then we error out from single table
-- repartition code path
SELECT SELECT
avg(order_count) avg(order_count)
FROM FROM
@ -140,8 +143,8 @@ FROM
lineitem_subquery lineitem_subquery
GROUP BY GROUP BY
l_suppkey) AS order_counts; l_suppkey) AS order_counts;
ERROR: cannot push down this subquery ERROR: cannot use real time executor with repartition jobs
DETAIL: Group by list without partition column is currently unsupported HINT: Set citus.task_executor_type to "task-tracker".
-- Check that we error out if join is not on partition columns. -- Check that we error out if join is not on partition columns.
SELECT SELECT
avg(unit_price) avg(unit_price)
@ -223,6 +226,8 @@ FROM (
count(*) a count(*) a
FROM FROM
lineitem_subquery lineitem_subquery
GROUP BY
l_orderkey
) z; ) z;
ERROR: cannot push down this subquery ERROR: cannot push down this subquery
DETAIL: distinct in the outermost query is unsupported DETAIL: distinct in the outermost query is unsupported
@ -357,7 +362,6 @@ SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1);
(1 row) (1 row)
SET citus.subquery_pushdown TO TRUE;
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;
SELECT * FROM SELECT * FROM
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a)
@ -884,7 +888,6 @@ SELECT master_create_empty_shard('users') AS new_shard_id
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id; WHERE shardid = :new_shard_id;
\COPY users FROM STDIN WITH CSV \COPY users FROM STDIN WITH CSV
SET citus.subquery_pushdown TO TRUE;
-- Simple join subquery pushdown -- Simple join subquery pushdown
SELECT SELECT
avg(array_length(events, 1)) AS event_average avg(array_length(events, 1)) AS event_average

View File

@ -53,7 +53,7 @@ SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range')
(1 row) (1 row)
SET citus.enable_router_execution TO 'false'; SET citus.enable_router_execution TO 'false';
-- Check that we don't allow subquery pushdown in default settings. -- Check that we allow subquery pushdown in default settings.
SELECT SELECT
avg(unit_price) avg(unit_price)
FROM FROM
@ -67,9 +67,11 @@ FROM
l_orderkey = o_orderkey l_orderkey = o_orderkey
GROUP BY GROUP BY
l_orderkey) AS unit_prices; l_orderkey) AS unit_prices;
ERROR: cannot perform distributed planning on this query avg
DETAIL: Join in subqueries is not supported yet -----
SET citus.subquery_pushdown to TRUE;
(1 row)
-- Check that we don't crash if there are not any shards. -- Check that we don't crash if there are not any shards.
SELECT SELECT
avg(unit_price) avg(unit_price)
@ -129,7 +131,8 @@ ERROR: cannot push down this subquery
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
-- Update metadata in order to make all shards equal. -- Update metadata in order to make all shards equal.
UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003; UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003;
-- If group by is not on partition column then we error out. -- If group by is not on partition column then we error out from single table
-- repartition code path
SELECT SELECT
avg(order_count) avg(order_count)
FROM FROM
@ -140,8 +143,8 @@ FROM
lineitem_subquery lineitem_subquery
GROUP BY GROUP BY
l_suppkey) AS order_counts; l_suppkey) AS order_counts;
ERROR: cannot push down this subquery ERROR: cannot use real time executor with repartition jobs
DETAIL: Group by list without partition column is currently unsupported HINT: Set citus.task_executor_type to "task-tracker".
-- Check that we error out if join is not on partition columns. -- Check that we error out if join is not on partition columns.
SELECT SELECT
avg(unit_price) avg(unit_price)
@ -223,6 +226,8 @@ FROM (
count(*) a count(*) a
FROM FROM
lineitem_subquery lineitem_subquery
GROUP BY
l_orderkey
) z; ) z;
ERROR: cannot push down this subquery ERROR: cannot push down this subquery
DETAIL: distinct in the outermost query is unsupported DETAIL: distinct in the outermost query is unsupported
@ -357,7 +362,6 @@ SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1);
(1 row) (1 row)
SET citus.subquery_pushdown TO TRUE;
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;
SELECT * FROM SELECT * FROM
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a) (SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a)
@ -884,7 +888,6 @@ SELECT master_create_empty_shard('users') AS new_shard_id
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id; WHERE shardid = :new_shard_id;
\COPY users FROM STDIN WITH CSV \COPY users FROM STDIN WITH CSV
SET citus.subquery_pushdown TO TRUE;
-- Simple join subquery pushdown -- Simple join subquery pushdown
SELECT SELECT
avg(array_length(events, 1)) AS event_average avg(array_length(events, 1)) AS event_average

View File

@ -161,13 +161,11 @@ SELECT count(*) FROM lineitem, orders WHERE l_orderkey + 1 = o_orderkey;
-- Check that we can issue limit/offset queries -- Check that we can issue limit/offset queries
-- OFFSET in subqueries are not supported -- OFFSET in subqueries are not supported
-- Error in the planner when subquery pushdown is off -- Error in the planner when single repartition subquery
SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; SELECT * FROM (SELECT o_custkey FROM orders GROUP BY o_custkey ORDER BY o_custkey OFFSET 20) sq;
SET citus.subquery_pushdown TO true;
-- Error in the optimizer when subquery pushdown is on -- Error in the optimizer when subquery pushdown is on
SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq; SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq;
SET citus.subquery_pushdown TO false;
-- Simple LIMIT/OFFSET with ORDER BY -- Simple LIMIT/OFFSET with ORDER BY
SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20; SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20;

View File

@ -8,7 +8,6 @@
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1430000; -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1430000;
-- ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1430000; -- ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1430000;
SET citus.subquery_pushdown TO TRUE;
SET citus.enable_router_execution TO FALSE; SET citus.enable_router_execution TO FALSE;
------------------------------------ ------------------------------------
@ -1346,5 +1345,4 @@ FROM (SELECT
WHERE b.user_id IS NULL WHERE b.user_id IS NULL
GROUP BY a.user_id; GROUP BY a.user_id;
SET citus.subquery_pushdown TO FALSE;
SET citus.enable_router_execution TO TRUE; SET citus.enable_router_execution TO TRUE;

View File

@ -8,7 +8,6 @@
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000;
SET citus.subquery_pushdown TO TRUE;
SET citus.enable_router_execution TO FALSE; SET citus.enable_router_execution TO FALSE;
-- --
@ -1870,5 +1869,4 @@ INNER JOIN
GROUP BY types GROUP BY types
ORDER BY types; ORDER BY types;
SET citus.subquery_pushdown TO FALSE;
SET citus.enable_router_execution TO TRUE; SET citus.enable_router_execution TO TRUE;

View File

@ -6,7 +6,6 @@
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
SET citus.subquery_pushdown TO true;
SET citus.enable_router_execution TO false; SET citus.enable_router_execution TO false;
-- a very simple union query -- a very simple union query
SELECT user_id, counter SELECT user_id, counter
@ -673,5 +672,4 @@ FROM
GROUP BY types GROUP BY types
ORDER BY types; ORDER BY types;
SET citus.subquery_pushdown TO false;
SET citus.enable_router_execution TO true; SET citus.enable_router_execution TO true;

View File

@ -123,17 +123,21 @@ SET citus.task_executor_type to DEFAULT;
CREATE VIEW lineitems_by_shipping_method AS CREATE VIEW lineitems_by_shipping_method AS
SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1; SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1;
-- following will fail due to non-flattening of subquery due to GROUP BY -- following will fail due to non GROUP BY of partition key
SELECT * FROM lineitems_by_shipping_method; SELECT * FROM lineitems_by_shipping_method;
-- create a view with group by on partition column -- create a view with group by on partition column
CREATE VIEW lineitems_by_orderkey AS CREATE VIEW lineitems_by_orderkey AS
SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1; SELECT
l_orderkey, count(*)
FROM
lineitem_hash_part
GROUP BY 1;
-- this will also fail due to same reason -- this should work since we're able to push down this query
SELECT * FROM lineitems_by_orderkey; SELECT * FROM lineitems_by_orderkey ORDER BY 2 DESC, 1 ASC LIMIT 10;
-- however it would work if it is made router plannable -- it would also work since it is made router plannable
SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100; SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100;
DROP TABLE temp_lineitem CASCADE; DROP TABLE temp_lineitem CASCADE;