mirror of https://github.com/citusdata/citus.git
Change behaviour of subquery pushdown flag
This commit changes the behaviour of the citus.subquery_pushdown flag. Before this commit, the flag is used to enable subquery pushdown logic. But, with this commit, that behaviour is enabled by default. In other words, the flag is now useless. We prefer to keep the flag since we don't want to break the backward compatibility. Also, we may consider using that flag for other purposes in the next commits.pull/1315/head
parent
897c9852a8
commit
b72876fc8e
|
@ -47,7 +47,6 @@
|
|||
#include "utils/fmgroids.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/relcache.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "utils/tqual.h"
|
||||
|
||||
|
@ -147,28 +146,6 @@ static bool TablePartitioningSupportsDistinct(List *tableNodeList,
|
|||
Var *distinctColumn);
|
||||
static bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column);
|
||||
|
||||
/* Local functions forward declarations for subquery pushdown checks */
|
||||
static void ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode,
|
||||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext,
|
||||
Query *originalQuery);
|
||||
static void ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit);
|
||||
static void ErrorIfUnsupportedSetOperation(Query *subqueryTree, bool outerQueryHasLimit);
|
||||
static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationList);
|
||||
static void ErrorIfUnsupportedTableCombination(Query *queryTree);
|
||||
static bool TargetListOnPartitionColumn(Query *query, List *targetEntryList);
|
||||
static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query);
|
||||
static bool FullCompositeFieldList(List *compositeFieldList);
|
||||
static void ErrorIfUnsupportedShardDistribution(Query *query);
|
||||
static List * RelationIdList(Query *query);
|
||||
static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
|
||||
static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction,
|
||||
ShardInterval *firstInterval,
|
||||
ShardInterval *secondInterval);
|
||||
static void ErrorIfUnsupportedFilters(Query *subquery);
|
||||
static bool EqualOpExpressionLists(List *firstOpExpressionList,
|
||||
List *secondOpExpressionList);
|
||||
|
||||
/* Local functions forward declarations for limit clauses */
|
||||
static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode);
|
||||
static List * WorkerSortClauseList(MultiExtendedOp *originalOpNode);
|
||||
|
@ -191,16 +168,9 @@ static bool HasOrderByHllType(List *sortClauseList, List *targetList);
|
|||
* Third, the function pulls up the collect operators in the tree. Fourth, the
|
||||
* function finds the extended operator node, and splits this node into master
|
||||
* and worker extended operator nodes.
|
||||
*
|
||||
* We also pass plannerRestrictionContext and originalQuery to the optimizer.
|
||||
* These are primarily used to decide whether the subquery is safe to pushdown.
|
||||
* If not, it helps to produce meaningful error messages for subquery
|
||||
* pushdown planning.
|
||||
*/
|
||||
void
|
||||
MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan,
|
||||
PlannerRestrictionContext *plannerRestrictionContext,
|
||||
Query *originalQuery)
|
||||
MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
|
||||
{
|
||||
bool hasOrderByHllType = false;
|
||||
List *selectNodeList = NIL;
|
||||
|
@ -219,10 +189,6 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan,
|
|||
/* check that we can optimize aggregates in the plan */
|
||||
ErrorIfContainsUnsupportedAggregate(logicalPlanNode);
|
||||
|
||||
/* check that we can pushdown subquery in the plan */
|
||||
ErrorIfContainsUnsupportedSubquery(logicalPlanNode, plannerRestrictionContext,
|
||||
originalQuery);
|
||||
|
||||
/*
|
||||
* If a select node exists, we use the idempower property to split the node
|
||||
* into two nodes that contain And and Or clauses. If both And and Or nodes
|
||||
|
@ -2808,81 +2774,6 @@ GroupedByColumn(List *groupClauseList, List *targetList, Var *column)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfContainsUnsupportedSubquery extracts subquery multi table from the
|
||||
* logical plan using plannerRestrictionContext and the original query. It uses
|
||||
* some helper functions to check if we can push down subquery to worker nodes.
|
||||
* These helper functions error out if we cannot push down the subquery.
|
||||
*/
|
||||
static void
|
||||
ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode,
|
||||
PlannerRestrictionContext *plannerRestrictionContext,
|
||||
Query *originalQuery)
|
||||
{
|
||||
Query *subquery = NULL;
|
||||
List *extendedOpNodeList = NIL;
|
||||
MultiTable *multiTable = NULL;
|
||||
MultiExtendedOp *extendedOpNode = NULL;
|
||||
bool outerQueryHasLimit = false;
|
||||
RelationRestrictionContext *relationRestrictionContext =
|
||||
plannerRestrictionContext->relationRestrictionContext;
|
||||
|
||||
/* check if logical plan includes a subquery */
|
||||
List *subqueryMultiTableList = SubqueryMultiTableList(logicalPlanNode);
|
||||
if (subqueryMultiTableList == NIL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/* currently in the planner we only allow one subquery in from-clause*/
|
||||
Assert(list_length(subqueryMultiTableList) == 1);
|
||||
|
||||
/*
|
||||
* We're checking two things here:
|
||||
* (i) If the query contains a top level union, ensure that all leaves
|
||||
* return the partition key at the same position
|
||||
* (ii) Else, check whether all relations joined on the partition key or not
|
||||
*/
|
||||
if (ContainsUnionSubquery(originalQuery))
|
||||
{
|
||||
if (!SafeToPushdownUnionSubquery(relationRestrictionContext))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot pushdown the subquery since all leaves of "
|
||||
"the UNION does not include partition key at the "
|
||||
"same position"),
|
||||
errdetail("Each leaf query of the UNION should return "
|
||||
"partition key at the same position on its "
|
||||
"target list.")));
|
||||
}
|
||||
}
|
||||
else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot pushdown the subquery since all relations are not "
|
||||
"joined using distribution keys"),
|
||||
errdetail("Each relation should be joined with at least "
|
||||
"one another relation using distribution keys and "
|
||||
"equality operator.")));
|
||||
}
|
||||
|
||||
multiTable = (MultiTable *) linitial(subqueryMultiTableList);
|
||||
subquery = multiTable->subquery;
|
||||
|
||||
extendedOpNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp);
|
||||
extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList);
|
||||
|
||||
if (extendedOpNode->limitCount)
|
||||
{
|
||||
outerQueryHasLimit = true;
|
||||
}
|
||||
|
||||
ErrorIfCannotPushdownSubquery(subquery, outerQueryHasLimit);
|
||||
ErrorIfUnsupportedShardDistribution(subquery);
|
||||
ErrorIfUnsupportedFilters(subquery);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SubqueryMultiTableList extracts multi tables in the given logical plan tree
|
||||
* and returns subquery multi tables in a new list.
|
||||
|
@ -2909,290 +2800,6 @@ SubqueryMultiTableList(MultiNode *multiNode)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfCannotPushdownSubquery recursively checks if we can push down the given
|
||||
* subquery to worker nodes. If we cannot push down the subquery, this function
|
||||
* errors out.
|
||||
*
|
||||
* We can push down a subquery if it follows rules below. We support nested queries
|
||||
* as long as they follow the same rules, and we recurse to validate each subquery
|
||||
* for this given query.
|
||||
* a. If there is an aggregate, it must be grouped on partition column.
|
||||
* b. If there is a join, it must be between two regular tables or two subqueries.
|
||||
* We don't support join between a regular table and a subquery. And columns on
|
||||
* the join condition must be partition columns.
|
||||
* c. If there is a distinct clause, it must be on the partition column.
|
||||
*
|
||||
* This function is very similar to ErrorIfQueryNotSupported() in logical
|
||||
* planner, but we don't reuse it, because differently for subqueries we support
|
||||
* a subset of distinct, union and left joins.
|
||||
*
|
||||
* Note that this list of checks is not exhaustive, there can be some cases
|
||||
* which we let subquery to run but returned results would be wrong. Such as if
|
||||
* a subquery has a group by on another subquery which includes order by with
|
||||
* limit, we let this query to run, but results could be wrong depending on the
|
||||
* features of underlying tables.
|
||||
*/
|
||||
static void
|
||||
ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit)
|
||||
{
|
||||
bool preconditionsSatisfied = true;
|
||||
char *errorDetail = NULL;
|
||||
List *subqueryEntryList = NIL;
|
||||
ListCell *rangeTableEntryCell = NULL;
|
||||
|
||||
ErrorIfUnsupportedTableCombination(subqueryTree);
|
||||
|
||||
if (subqueryTree->hasSubLinks)
|
||||
{
|
||||
preconditionsSatisfied = false;
|
||||
errorDetail = "Subqueries other than from-clause subqueries are unsupported";
|
||||
}
|
||||
|
||||
if (subqueryTree->rtable == NIL)
|
||||
{
|
||||
preconditionsSatisfied = false;
|
||||
errorDetail = "Subqueries without relations are unsupported";
|
||||
}
|
||||
|
||||
if (subqueryTree->hasWindowFuncs)
|
||||
{
|
||||
preconditionsSatisfied = false;
|
||||
errorDetail = "Window functions are currently unsupported";
|
||||
}
|
||||
|
||||
if (subqueryTree->limitOffset)
|
||||
{
|
||||
preconditionsSatisfied = false;
|
||||
errorDetail = "Offset clause is currently unsupported";
|
||||
}
|
||||
|
||||
if (subqueryTree->limitCount && !outerQueryHasLimit)
|
||||
{
|
||||
preconditionsSatisfied = false;
|
||||
errorDetail = "Limit in subquery without limit in the outer query is unsupported";
|
||||
}
|
||||
|
||||
if (subqueryTree->setOperations)
|
||||
{
|
||||
ErrorIfUnsupportedSetOperation(subqueryTree, outerQueryHasLimit);
|
||||
}
|
||||
|
||||
if (subqueryTree->hasRecursive)
|
||||
{
|
||||
preconditionsSatisfied = false;
|
||||
errorDetail = "Recursive queries are currently unsupported";
|
||||
}
|
||||
|
||||
if (subqueryTree->cteList)
|
||||
{
|
||||
preconditionsSatisfied = false;
|
||||
errorDetail = "Common Table Expressions are currently unsupported";
|
||||
}
|
||||
|
||||
if (subqueryTree->hasForUpdate)
|
||||
{
|
||||
preconditionsSatisfied = false;
|
||||
errorDetail = "For Update/Share commands are currently unsupported";
|
||||
}
|
||||
|
||||
/* group clause list must include partition column */
|
||||
if (subqueryTree->groupClause)
|
||||
{
|
||||
List *groupClauseList = subqueryTree->groupClause;
|
||||
List *targetEntryList = subqueryTree->targetList;
|
||||
List *groupTargetEntryList = GroupTargetEntryList(groupClauseList,
|
||||
targetEntryList);
|
||||
bool groupOnPartitionColumn = TargetListOnPartitionColumn(subqueryTree,
|
||||
groupTargetEntryList);
|
||||
if (!groupOnPartitionColumn)
|
||||
{
|
||||
preconditionsSatisfied = false;
|
||||
errorDetail = "Group by list without partition column is currently "
|
||||
"unsupported";
|
||||
}
|
||||
}
|
||||
|
||||
/* we don't support aggregates without group by */
|
||||
if (subqueryTree->hasAggs && (subqueryTree->groupClause == NULL))
|
||||
{
|
||||
preconditionsSatisfied = false;
|
||||
errorDetail = "Aggregates without group by are currently unsupported";
|
||||
}
|
||||
|
||||
/* having clause without group by on partition column is not supported */
|
||||
if (subqueryTree->havingQual && (subqueryTree->groupClause == NULL))
|
||||
{
|
||||
preconditionsSatisfied = false;
|
||||
errorDetail = "Having qual without group by on partition column is "
|
||||
"currently unsupported";
|
||||
}
|
||||
|
||||
/* distinct clause list must include partition column */
|
||||
if (subqueryTree->distinctClause)
|
||||
{
|
||||
List *distinctClauseList = subqueryTree->distinctClause;
|
||||
List *targetEntryList = subqueryTree->targetList;
|
||||
List *distinctTargetEntryList = GroupTargetEntryList(distinctClauseList,
|
||||
targetEntryList);
|
||||
bool distinctOnPartitionColumn =
|
||||
TargetListOnPartitionColumn(subqueryTree, distinctTargetEntryList);
|
||||
if (!distinctOnPartitionColumn)
|
||||
{
|
||||
preconditionsSatisfied = false;
|
||||
errorDetail = "Distinct on columns without partition column is "
|
||||
"currently unsupported";
|
||||
}
|
||||
}
|
||||
|
||||
/* finally check and error out if not satisfied */
|
||||
if (!preconditionsSatisfied)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot push down this subquery"),
|
||||
errdetail("%s", errorDetail)));
|
||||
}
|
||||
|
||||
/* recursively do same check for subqueries of this query */
|
||||
subqueryEntryList = SubqueryEntryList(subqueryTree);
|
||||
foreach(rangeTableEntryCell, subqueryEntryList)
|
||||
{
|
||||
RangeTblEntry *rangeTableEntry =
|
||||
(RangeTblEntry *) lfirst(rangeTableEntryCell);
|
||||
|
||||
Query *innerSubquery = rangeTableEntry->subquery;
|
||||
ErrorIfCannotPushdownSubquery(innerSubquery, outerQueryHasLimit);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedSetOperation is a helper function for ErrorIfCannotPushdownSubquery().
|
||||
* It basically iterates over the subqueries that reside under the given set operations.
|
||||
*
|
||||
* The function also errors out for set operations INTERSECT and EXCEPT.
|
||||
*/
|
||||
static void
|
||||
ErrorIfUnsupportedSetOperation(Query *subqueryTree, bool outerQueryHasLimit)
|
||||
{
|
||||
List *rangeTableList = subqueryTree->rtable;
|
||||
List *rangeTableIndexList = NIL;
|
||||
ListCell *rangeTableIndexCell = NULL;
|
||||
List *setOperationStatementList = NIL;
|
||||
ListCell *setOperationStatmentCell = NULL;
|
||||
|
||||
ExtractSetOperationStatmentWalker((Node *) subqueryTree->setOperations,
|
||||
&setOperationStatementList);
|
||||
foreach(setOperationStatmentCell, setOperationStatementList)
|
||||
{
|
||||
SetOperationStmt *setOperation =
|
||||
(SetOperationStmt *) lfirst(setOperationStatmentCell);
|
||||
|
||||
if (setOperation->op != SETOP_UNION)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot push down this subquery"),
|
||||
errdetail("Intersect and Except are currently unsupported")));
|
||||
}
|
||||
}
|
||||
|
||||
ExtractRangeTableIndexWalker((Node *) subqueryTree->setOperations,
|
||||
&rangeTableIndexList);
|
||||
foreach(rangeTableIndexCell, rangeTableIndexList)
|
||||
{
|
||||
int rangeTableIndex = lfirst_int(rangeTableIndexCell);
|
||||
RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableIndex, rangeTableList);
|
||||
|
||||
Assert(rangeTableEntry->rtekind == RTE_SUBQUERY);
|
||||
|
||||
ErrorIfCannotPushdownSubquery(rangeTableEntry->subquery, outerQueryHasLimit);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExtractSetOperationStatementWalker walks over a set operations statment,
|
||||
* and finds all set operations in the tree.
|
||||
*/
|
||||
static bool
|
||||
ExtractSetOperationStatmentWalker(Node *node, List **setOperationList)
|
||||
{
|
||||
bool walkerResult = false;
|
||||
if (node == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (IsA(node, SetOperationStmt))
|
||||
{
|
||||
SetOperationStmt *setOperation = (SetOperationStmt *) node;
|
||||
|
||||
(*setOperationList) = lappend(*setOperationList, setOperation);
|
||||
}
|
||||
|
||||
walkerResult = expression_tree_walker(node, ExtractSetOperationStatmentWalker,
|
||||
setOperationList);
|
||||
|
||||
return walkerResult;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedTableCombination checks if the given query tree contains any
|
||||
* unsupported range table combinations. For this, the function walks over all
|
||||
* range tables in the join tree, and checks if they correspond to simple relations
|
||||
* or subqueries.
|
||||
*/
|
||||
static void
|
||||
ErrorIfUnsupportedTableCombination(Query *queryTree)
|
||||
{
|
||||
List *rangeTableList = queryTree->rtable;
|
||||
List *joinTreeTableIndexList = NIL;
|
||||
ListCell *joinTreeTableIndexCell = NULL;
|
||||
bool unsupporteTableCombination = false;
|
||||
char *errorDetail = NULL;
|
||||
|
||||
/*
|
||||
* Extract all range table indexes from the join tree. Note that sub-queries
|
||||
* that get pulled up by PostgreSQL don't appear in this join tree.
|
||||
*/
|
||||
ExtractRangeTableIndexWalker((Node *) queryTree->jointree, &joinTreeTableIndexList);
|
||||
foreach(joinTreeTableIndexCell, joinTreeTableIndexList)
|
||||
{
|
||||
/*
|
||||
* Join tree's range table index starts from 1 in the query tree. But,
|
||||
* list indexes start from 0.
|
||||
*/
|
||||
int joinTreeTableIndex = lfirst_int(joinTreeTableIndexCell);
|
||||
int rangeTableListIndex = joinTreeTableIndex - 1;
|
||||
|
||||
RangeTblEntry *rangeTableEntry =
|
||||
(RangeTblEntry *) list_nth(rangeTableList, rangeTableListIndex);
|
||||
|
||||
/*
|
||||
* Check if the range table in the join tree is a simple relation or a
|
||||
* subquery.
|
||||
*/
|
||||
if (rangeTableEntry->rtekind != RTE_RELATION &&
|
||||
rangeTableEntry->rtekind != RTE_SUBQUERY)
|
||||
{
|
||||
unsupporteTableCombination = true;
|
||||
errorDetail = "Table expressions other than simple relations and "
|
||||
"subqueries are currently unsupported";
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* finally check and error out if not satisfied */
|
||||
if (unsupporteTableCombination)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot push down this subquery"),
|
||||
errdetail("%s", errorDetail)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GroupTargetEntryList walks over group clauses in the given list, finds
|
||||
* matching target entries and return them in a new list.
|
||||
|
@ -3215,53 +2822,6 @@ GroupTargetEntryList(List *groupClauseList, List *targetEntryList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* TargetListOnPartitionColumn checks if at least one target list entry is on
|
||||
* partition column.
|
||||
*/
|
||||
static bool
|
||||
TargetListOnPartitionColumn(Query *query, List *targetEntryList)
|
||||
{
|
||||
bool targetListOnPartitionColumn = false;
|
||||
List *compositeFieldList = NIL;
|
||||
|
||||
ListCell *targetEntryCell = NULL;
|
||||
foreach(targetEntryCell, targetEntryList)
|
||||
{
|
||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||
Expr *targetExpression = targetEntry->expr;
|
||||
|
||||
bool isPartitionColumn = IsPartitionColumn(targetExpression, query);
|
||||
if (isPartitionColumn)
|
||||
{
|
||||
FieldSelect *compositeField = CompositeFieldRecursive(targetExpression,
|
||||
query);
|
||||
if (compositeField)
|
||||
{
|
||||
compositeFieldList = lappend(compositeFieldList, compositeField);
|
||||
}
|
||||
else
|
||||
{
|
||||
targetListOnPartitionColumn = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* check composite fields */
|
||||
if (!targetListOnPartitionColumn)
|
||||
{
|
||||
bool fullCompositeFieldList = FullCompositeFieldList(compositeFieldList);
|
||||
if (fullCompositeFieldList)
|
||||
{
|
||||
targetListOnPartitionColumn = true;
|
||||
}
|
||||
}
|
||||
|
||||
return targetListOnPartitionColumn;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsPartitionColumn returns true if the given column is a partition column.
|
||||
* The function uses FindReferencedTableColumn to find the original relation
|
||||
|
@ -3415,386 +2975,6 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CompositeFieldRecursive recursively finds composite field in the query tree
|
||||
* referred by given expression. If expression does not refer to a composite
|
||||
* field, then it returns NULL.
|
||||
*
|
||||
* If expression is a field select we directly return composite field. If it is
|
||||
* a column is referenced from a subquery, then we recursively check that subquery
|
||||
* until we reach the source of that column, and find composite field. If this
|
||||
* column is referenced from join range table entry, then we resolve which join
|
||||
* column it refers and recursively use this column with the same query.
|
||||
*/
|
||||
static FieldSelect *
|
||||
CompositeFieldRecursive(Expr *expression, Query *query)
|
||||
{
|
||||
FieldSelect *compositeField = NULL;
|
||||
List *rangetableList = query->rtable;
|
||||
Index rangeTableEntryIndex = 0;
|
||||
RangeTblEntry *rangeTableEntry = NULL;
|
||||
Var *candidateColumn = NULL;
|
||||
|
||||
if (IsA(expression, FieldSelect))
|
||||
{
|
||||
compositeField = (FieldSelect *) expression;
|
||||
return compositeField;
|
||||
}
|
||||
|
||||
if (IsA(expression, Var))
|
||||
{
|
||||
candidateColumn = (Var *) expression;
|
||||
}
|
||||
else
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
rangeTableEntryIndex = candidateColumn->varno - 1;
|
||||
rangeTableEntry = list_nth(rangetableList, rangeTableEntryIndex);
|
||||
|
||||
if (rangeTableEntry->rtekind == RTE_SUBQUERY)
|
||||
{
|
||||
Query *subquery = rangeTableEntry->subquery;
|
||||
List *targetEntryList = subquery->targetList;
|
||||
AttrNumber targetEntryIndex = candidateColumn->varattno - 1;
|
||||
TargetEntry *subqueryTargetEntry = list_nth(targetEntryList, targetEntryIndex);
|
||||
|
||||
Expr *subqueryExpression = subqueryTargetEntry->expr;
|
||||
compositeField = CompositeFieldRecursive(subqueryExpression, subquery);
|
||||
}
|
||||
else if (rangeTableEntry->rtekind == RTE_JOIN)
|
||||
{
|
||||
List *joinColumnList = rangeTableEntry->joinaliasvars;
|
||||
AttrNumber joinColumnIndex = candidateColumn->varattno - 1;
|
||||
Expr *joinColumn = list_nth(joinColumnList, joinColumnIndex);
|
||||
|
||||
compositeField = CompositeFieldRecursive(joinColumn, query);
|
||||
}
|
||||
|
||||
return compositeField;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FullCompositeFieldList gets a composite field list, and checks if all fields
|
||||
* of composite type are used in the list.
|
||||
*/
|
||||
static bool
|
||||
FullCompositeFieldList(List *compositeFieldList)
|
||||
{
|
||||
bool fullCompositeFieldList = true;
|
||||
bool *compositeFieldArray = NULL;
|
||||
uint32 compositeFieldCount = 0;
|
||||
uint32 fieldIndex = 0;
|
||||
|
||||
ListCell *fieldSelectCell = NULL;
|
||||
foreach(fieldSelectCell, compositeFieldList)
|
||||
{
|
||||
FieldSelect *fieldSelect = (FieldSelect *) lfirst(fieldSelectCell);
|
||||
uint32 compositeFieldIndex = 0;
|
||||
|
||||
Expr *fieldExpression = fieldSelect->arg;
|
||||
if (!IsA(fieldExpression, Var))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (compositeFieldArray == NULL)
|
||||
{
|
||||
uint32 index = 0;
|
||||
Var *compositeColumn = (Var *) fieldExpression;
|
||||
Oid compositeTypeId = compositeColumn->vartype;
|
||||
Oid compositeRelationId = get_typ_typrelid(compositeTypeId);
|
||||
|
||||
/* get composite type attribute count */
|
||||
Relation relation = relation_open(compositeRelationId, AccessShareLock);
|
||||
compositeFieldCount = relation->rd_att->natts;
|
||||
compositeFieldArray = palloc0(compositeFieldCount * sizeof(bool));
|
||||
relation_close(relation, AccessShareLock);
|
||||
|
||||
for (index = 0; index < compositeFieldCount; index++)
|
||||
{
|
||||
compositeFieldArray[index] = false;
|
||||
}
|
||||
}
|
||||
|
||||
compositeFieldIndex = fieldSelect->fieldnum - 1;
|
||||
compositeFieldArray[compositeFieldIndex] = true;
|
||||
}
|
||||
|
||||
for (fieldIndex = 0; fieldIndex < compositeFieldCount; fieldIndex++)
|
||||
{
|
||||
if (!compositeFieldArray[fieldIndex])
|
||||
{
|
||||
fullCompositeFieldList = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (compositeFieldCount == 0)
|
||||
{
|
||||
fullCompositeFieldList = false;
|
||||
}
|
||||
|
||||
return fullCompositeFieldList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedShardDistribution gets list of relations in the given query
|
||||
* and checks if two conditions below hold for them, otherwise it errors out.
|
||||
* a. Every relation is distributed by range or hash. This means shards are
|
||||
* disjoint based on the partition column.
|
||||
* b. All relations have 1-to-1 shard partitioning between them. This means
|
||||
* shard count for every relation is same and for every shard in a relation
|
||||
* there is exactly one shard in other relations with same min/max values.
|
||||
*/
|
||||
static void
|
||||
ErrorIfUnsupportedShardDistribution(Query *query)
|
||||
{
|
||||
Oid firstTableRelationId = InvalidOid;
|
||||
List *relationIdList = RelationIdList(query);
|
||||
ListCell *relationIdCell = NULL;
|
||||
uint32 relationIndex = 0;
|
||||
uint32 rangeDistributedRelationCount = 0;
|
||||
uint32 hashDistributedRelationCount = 0;
|
||||
|
||||
foreach(relationIdCell, relationIdList)
|
||||
{
|
||||
Oid relationId = lfirst_oid(relationIdCell);
|
||||
char partitionMethod = PartitionMethod(relationId);
|
||||
if (partitionMethod == DISTRIBUTE_BY_RANGE)
|
||||
{
|
||||
rangeDistributedRelationCount++;
|
||||
}
|
||||
else if (partitionMethod == DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
hashDistributedRelationCount++;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot push down this subquery"),
|
||||
errdetail("Currently range and hash partitioned "
|
||||
"relations are supported")));
|
||||
}
|
||||
}
|
||||
|
||||
if ((rangeDistributedRelationCount > 0) && (hashDistributedRelationCount > 0))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot push down this subquery"),
|
||||
errdetail("A query including both range and hash "
|
||||
"partitioned relations are unsupported")));
|
||||
}
|
||||
|
||||
foreach(relationIdCell, relationIdList)
|
||||
{
|
||||
Oid relationId = lfirst_oid(relationIdCell);
|
||||
bool coPartitionedTables = false;
|
||||
Oid currentRelationId = relationId;
|
||||
|
||||
/* get shard list of first relation and continue for the next relation */
|
||||
if (relationIndex == 0)
|
||||
{
|
||||
firstTableRelationId = relationId;
|
||||
relationIndex++;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
/* check if this table has 1-1 shard partitioning with first table */
|
||||
coPartitionedTables = CoPartitionedTables(firstTableRelationId,
|
||||
currentRelationId);
|
||||
if (!coPartitionedTables)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot push down this subquery"),
|
||||
errdetail("Shards of relations in subquery need to "
|
||||
"have 1-to-1 shard partitioning")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RelationIdList returns list of unique relation ids in query tree.
|
||||
*/
|
||||
static List *
|
||||
RelationIdList(Query *query)
|
||||
{
|
||||
List *rangeTableList = NIL;
|
||||
List *tableEntryList = NIL;
|
||||
List *relationIdList = NIL;
|
||||
ListCell *tableEntryCell = NULL;
|
||||
|
||||
ExtractRangeTableRelationWalker((Node *) query, &rangeTableList);
|
||||
tableEntryList = TableEntryList(rangeTableList);
|
||||
|
||||
foreach(tableEntryCell, tableEntryList)
|
||||
{
|
||||
TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell);
|
||||
Oid relationId = tableEntry->relationId;
|
||||
|
||||
relationIdList = list_append_unique_oid(relationIdList, relationId);
|
||||
}
|
||||
|
||||
return relationIdList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CoPartitionedTables checks if given two distributed tables have 1-to-1 shard
|
||||
* partitioning. It uses shard interval array that are sorted on interval minimum
|
||||
* values. Then it compares every shard interval in order and if any pair of
|
||||
* shard intervals are not equal it returns false.
|
||||
*/
|
||||
static bool
|
||||
CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
||||
{
|
||||
bool coPartitionedTables = true;
|
||||
uint32 intervalIndex = 0;
|
||||
DistTableCacheEntry *firstTableCache = DistributedTableCacheEntry(firstRelationId);
|
||||
DistTableCacheEntry *secondTableCache = DistributedTableCacheEntry(secondRelationId);
|
||||
ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray;
|
||||
ShardInterval **sortedSecondIntervalArray =
|
||||
secondTableCache->sortedShardIntervalArray;
|
||||
uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength;
|
||||
uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength;
|
||||
FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction;
|
||||
|
||||
if (firstListShardCount != secondListShardCount)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* if there are not any shards just return true */
|
||||
if (firstListShardCount == 0)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
Assert(comparisonFunction != NULL);
|
||||
|
||||
for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++)
|
||||
{
|
||||
ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex];
|
||||
ShardInterval *secondInterval = sortedSecondIntervalArray[intervalIndex];
|
||||
|
||||
bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction,
|
||||
firstInterval,
|
||||
secondInterval);
|
||||
if (!shardIntervalsEqual)
|
||||
{
|
||||
coPartitionedTables = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return coPartitionedTables;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShardIntervalsEqual checks if given shard intervals have equal min/max values.
|
||||
*/
|
||||
static bool
|
||||
ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval,
|
||||
ShardInterval *secondInterval)
|
||||
{
|
||||
bool shardIntervalsEqual = false;
|
||||
Datum firstMin = 0;
|
||||
Datum firstMax = 0;
|
||||
Datum secondMin = 0;
|
||||
Datum secondMax = 0;
|
||||
|
||||
firstMin = firstInterval->minValue;
|
||||
firstMax = firstInterval->maxValue;
|
||||
secondMin = secondInterval->minValue;
|
||||
secondMax = secondInterval->maxValue;
|
||||
|
||||
if (firstInterval->minValueExists && firstInterval->maxValueExists &&
|
||||
secondInterval->minValueExists && secondInterval->maxValueExists)
|
||||
{
|
||||
Datum minDatum = CompareCall2(comparisonFunction, firstMin, secondMin);
|
||||
Datum maxDatum = CompareCall2(comparisonFunction, firstMax, secondMax);
|
||||
int firstComparison = DatumGetInt32(minDatum);
|
||||
int secondComparison = DatumGetInt32(maxDatum);
|
||||
|
||||
if (firstComparison == 0 && secondComparison == 0)
|
||||
{
|
||||
shardIntervalsEqual = true;
|
||||
}
|
||||
}
|
||||
|
||||
return shardIntervalsEqual;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedFilters checks if all leaf queries in the given query have
|
||||
* same filter on the partition column. Note that if there are queries without
|
||||
* any filter on the partition column, they don't break this prerequisite.
|
||||
*/
|
||||
static void
|
||||
ErrorIfUnsupportedFilters(Query *subquery)
|
||||
{
|
||||
List *queryList = NIL;
|
||||
ListCell *queryCell = NULL;
|
||||
List *subqueryOpExpressionList = NIL;
|
||||
List *relationIdList = RelationIdList(subquery);
|
||||
|
||||
/*
|
||||
* Get relation id of any relation in the subquery and create partiton column
|
||||
* for this relation. We will use this column to replace columns on operator
|
||||
* expressions on different tables. Then we compare these operator expressions
|
||||
* to see if they consist of same operator and constant value.
|
||||
*/
|
||||
Oid relationId = linitial_oid(relationIdList);
|
||||
Var *partitionColumn = PartitionColumn(relationId, 0);
|
||||
|
||||
ExtractQueryWalker((Node *) subquery, &queryList);
|
||||
foreach(queryCell, queryList)
|
||||
{
|
||||
Query *query = (Query *) lfirst(queryCell);
|
||||
List *opExpressionList = NIL;
|
||||
List *newOpExpressionList = NIL;
|
||||
|
||||
bool leafQuery = LeafQuery(query);
|
||||
if (!leafQuery)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
opExpressionList = PartitionColumnOpExpressionList(query);
|
||||
if (opExpressionList == NIL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
newOpExpressionList = ReplaceColumnsInOpExpressionList(opExpressionList,
|
||||
partitionColumn);
|
||||
|
||||
if (subqueryOpExpressionList == NIL)
|
||||
{
|
||||
subqueryOpExpressionList = newOpExpressionList;
|
||||
}
|
||||
else
|
||||
{
|
||||
bool equalOpExpressionLists = EqualOpExpressionLists(subqueryOpExpressionList,
|
||||
newOpExpressionList);
|
||||
if (!equalOpExpressionLists)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot push down this subquery"),
|
||||
errdetail("Currently all leaf queries need to "
|
||||
"have same filters on partition column")));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExtractQueryWalker walks over a query, and finds all queries in the query
|
||||
* tree and returns these queries.
|
||||
|
@ -3993,51 +3173,6 @@ ReplaceColumnsInOpExpressionList(List *opExpressionList, Var *newColumn)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* EqualOpExpressionLists checks if given two operator expression lists are
|
||||
* equal.
|
||||
*/
|
||||
static bool
|
||||
EqualOpExpressionLists(List *firstOpExpressionList, List *secondOpExpressionList)
|
||||
{
|
||||
bool equalOpExpressionLists = false;
|
||||
ListCell *firstOpExpressionCell = NULL;
|
||||
uint32 equalOpExpressionCount = 0;
|
||||
uint32 firstOpExpressionCount = list_length(firstOpExpressionList);
|
||||
uint32 secondOpExpressionCount = list_length(secondOpExpressionList);
|
||||
|
||||
if (firstOpExpressionCount != secondOpExpressionCount)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
foreach(firstOpExpressionCell, firstOpExpressionList)
|
||||
{
|
||||
OpExpr *firstOpExpression = (OpExpr *) lfirst(firstOpExpressionCell);
|
||||
ListCell *secondOpExpressionCell = NULL;
|
||||
|
||||
foreach(secondOpExpressionCell, secondOpExpressionList)
|
||||
{
|
||||
OpExpr *secondOpExpression = (OpExpr *) lfirst(secondOpExpressionCell);
|
||||
bool equalExpressions = equal(firstOpExpression, secondOpExpression);
|
||||
|
||||
if (equalExpressions)
|
||||
{
|
||||
equalOpExpressionCount++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (equalOpExpressionCount == firstOpExpressionCount)
|
||||
{
|
||||
equalOpExpressionLists = true;
|
||||
}
|
||||
|
||||
return equalOpExpressionLists;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* WorkerLimitCount checks if the given extended node contains a limit node, and
|
||||
* if that node can be pushed down. For this, the function checks if this limit
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -122,6 +122,11 @@ static Job * BuildJobTreeTaskList(Job *jobTree,
|
|||
PlannerRestrictionContext *plannerRestrictionContext);
|
||||
static List * SubquerySqlTaskList(Job *job,
|
||||
PlannerRestrictionContext *plannerRestrictionContext);
|
||||
static void ErrorIfUnsupportedShardDistribution(Query *query);
|
||||
static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
|
||||
static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction,
|
||||
ShardInterval *firstInterval,
|
||||
ShardInterval *secondInterval);
|
||||
static Task * SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
|
||||
RelationRestrictionContext *restrictionContext,
|
||||
uint32 taskId);
|
||||
|
@ -2009,6 +2014,9 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
|
|||
RelationRestrictionContext *relationRestrictionContext =
|
||||
plannerRestrictionContext->relationRestrictionContext;
|
||||
|
||||
/* error if shards are not co-partitioned */
|
||||
ErrorIfUnsupportedShardDistribution(subquery);
|
||||
|
||||
/* get list of all range tables in subquery tree */
|
||||
ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList);
|
||||
|
||||
|
@ -2060,6 +2068,171 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedShardDistribution gets list of relations in the given query
|
||||
* and checks if two conditions below hold for them, otherwise it errors out.
|
||||
* a. Every relation is distributed by range or hash. This means shards are
|
||||
* disjoint based on the partition column.
|
||||
* b. All relations have 1-to-1 shard partitioning between them. This means
|
||||
* shard count for every relation is same and for every shard in a relation
|
||||
* there is exactly one shard in other relations with same min/max values.
|
||||
*/
|
||||
static void
|
||||
ErrorIfUnsupportedShardDistribution(Query *query)
|
||||
{
|
||||
Oid firstTableRelationId = InvalidOid;
|
||||
List *relationIdList = RelationIdList(query);
|
||||
ListCell *relationIdCell = NULL;
|
||||
uint32 relationIndex = 0;
|
||||
uint32 rangeDistributedRelationCount = 0;
|
||||
uint32 hashDistributedRelationCount = 0;
|
||||
|
||||
foreach(relationIdCell, relationIdList)
|
||||
{
|
||||
Oid relationId = lfirst_oid(relationIdCell);
|
||||
char partitionMethod = PartitionMethod(relationId);
|
||||
if (partitionMethod == DISTRIBUTE_BY_RANGE)
|
||||
{
|
||||
rangeDistributedRelationCount++;
|
||||
}
|
||||
else if (partitionMethod == DISTRIBUTE_BY_HASH)
|
||||
{
|
||||
hashDistributedRelationCount++;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot push down this subquery"),
|
||||
errdetail("Currently range and hash partitioned "
|
||||
"relations are supported")));
|
||||
}
|
||||
}
|
||||
|
||||
if ((rangeDistributedRelationCount > 0) && (hashDistributedRelationCount > 0))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot push down this subquery"),
|
||||
errdetail("A query including both range and hash "
|
||||
"partitioned relations are unsupported")));
|
||||
}
|
||||
|
||||
foreach(relationIdCell, relationIdList)
|
||||
{
|
||||
Oid relationId = lfirst_oid(relationIdCell);
|
||||
bool coPartitionedTables = false;
|
||||
Oid currentRelationId = relationId;
|
||||
|
||||
/* get shard list of first relation and continue for the next relation */
|
||||
if (relationIndex == 0)
|
||||
{
|
||||
firstTableRelationId = relationId;
|
||||
relationIndex++;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
/* check if this table has 1-1 shard partitioning with first table */
|
||||
coPartitionedTables = CoPartitionedTables(firstTableRelationId,
|
||||
currentRelationId);
|
||||
if (!coPartitionedTables)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot push down this subquery"),
|
||||
errdetail("Shards of relations in subquery need to "
|
||||
"have 1-to-1 shard partitioning")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CoPartitionedTables checks if given two distributed tables have 1-to-1 shard
|
||||
* partitioning. It uses shard interval array that are sorted on interval minimum
|
||||
* values. Then it compares every shard interval in order and if any pair of
|
||||
* shard intervals are not equal it returns false.
|
||||
*/
|
||||
static bool
|
||||
CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
||||
{
|
||||
bool coPartitionedTables = true;
|
||||
uint32 intervalIndex = 0;
|
||||
DistTableCacheEntry *firstTableCache = DistributedTableCacheEntry(firstRelationId);
|
||||
DistTableCacheEntry *secondTableCache = DistributedTableCacheEntry(secondRelationId);
|
||||
ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray;
|
||||
ShardInterval **sortedSecondIntervalArray =
|
||||
secondTableCache->sortedShardIntervalArray;
|
||||
uint32 firstListShardCount = firstTableCache->shardIntervalArrayLength;
|
||||
uint32 secondListShardCount = secondTableCache->shardIntervalArrayLength;
|
||||
FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction;
|
||||
|
||||
if (firstListShardCount != secondListShardCount)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* if there are not any shards just return true */
|
||||
if (firstListShardCount == 0)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
Assert(comparisonFunction != NULL);
|
||||
|
||||
for (intervalIndex = 0; intervalIndex < firstListShardCount; intervalIndex++)
|
||||
{
|
||||
ShardInterval *firstInterval = sortedFirstIntervalArray[intervalIndex];
|
||||
ShardInterval *secondInterval = sortedSecondIntervalArray[intervalIndex];
|
||||
|
||||
bool shardIntervalsEqual = ShardIntervalsEqual(comparisonFunction,
|
||||
firstInterval,
|
||||
secondInterval);
|
||||
if (!shardIntervalsEqual)
|
||||
{
|
||||
coPartitionedTables = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return coPartitionedTables;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShardIntervalsEqual checks if given shard intervals have equal min/max values.
|
||||
*/
|
||||
static bool
|
||||
ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval,
|
||||
ShardInterval *secondInterval)
|
||||
{
|
||||
bool shardIntervalsEqual = false;
|
||||
Datum firstMin = 0;
|
||||
Datum firstMax = 0;
|
||||
Datum secondMin = 0;
|
||||
Datum secondMax = 0;
|
||||
|
||||
firstMin = firstInterval->minValue;
|
||||
firstMax = firstInterval->maxValue;
|
||||
secondMin = secondInterval->minValue;
|
||||
secondMax = secondInterval->maxValue;
|
||||
|
||||
if (firstInterval->minValueExists && firstInterval->maxValueExists &&
|
||||
secondInterval->minValueExists && secondInterval->maxValueExists)
|
||||
{
|
||||
Datum minDatum = CompareCall2(comparisonFunction, firstMin, secondMin);
|
||||
Datum maxDatum = CompareCall2(comparisonFunction, firstMax, secondMax);
|
||||
int firstComparison = DatumGetInt32(minDatum);
|
||||
int secondComparison = DatumGetInt32(maxDatum);
|
||||
|
||||
if (firstComparison == 0 && secondComparison == 0)
|
||||
{
|
||||
shardIntervalsEqual = true;
|
||||
}
|
||||
}
|
||||
|
||||
return shardIntervalsEqual;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SubqueryTaskCreate creates a sql task by replacing the target
|
||||
* shardInterval's boundary value.. Then performs the normal
|
||||
|
|
|
@ -310,10 +310,9 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
|
|||
*/
|
||||
if ((!distributedPlan || distributedPlan->planningError) && !hasUnresolvedParams)
|
||||
{
|
||||
/* Create and optimize logical plan */
|
||||
MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query);
|
||||
MultiLogicalPlanOptimize(logicalPlan, plannerRestrictionContext,
|
||||
originalQuery);
|
||||
MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query,
|
||||
plannerRestrictionContext);
|
||||
MultiLogicalPlanOptimize(logicalPlan);
|
||||
|
||||
/*
|
||||
* This check is here to make it likely that all node types used in
|
||||
|
|
|
@ -1366,3 +1366,29 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction)
|
|||
|
||||
return InvalidAttrNumber;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RelationIdList returns list of unique relation ids in query tree.
|
||||
*/
|
||||
List *
|
||||
RelationIdList(Query *query)
|
||||
{
|
||||
List *rangeTableList = NIL;
|
||||
List *tableEntryList = NIL;
|
||||
List *relationIdList = NIL;
|
||||
ListCell *tableEntryCell = NULL;
|
||||
|
||||
ExtractRangeTableRelationWalker((Node *) query, &rangeTableList);
|
||||
tableEntryList = TableEntryList(rangeTableList);
|
||||
|
||||
foreach(tableEntryCell, tableEntryList)
|
||||
{
|
||||
TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell);
|
||||
Oid relationId = tableEntry->relationId;
|
||||
|
||||
relationIdList = list_append_unique_oid(relationIdList, relationId);
|
||||
}
|
||||
|
||||
return relationIdList;
|
||||
}
|
||||
|
|
|
@ -107,9 +107,7 @@ extern double CountDistinctErrorRate;
|
|||
|
||||
|
||||
/* Function declaration for optimizing logical plans */
|
||||
extern void MultiLogicalPlanOptimize(MultiTreeRoot *multiTree,
|
||||
PlannerRestrictionContext *plannerRestrictionContext,
|
||||
Query *originalQuery);
|
||||
extern void MultiLogicalPlanOptimize(MultiTreeRoot *multiTree);
|
||||
|
||||
/* Function declaration for getting partition method for the given relation */
|
||||
extern char PartitionMethod(Oid relationId);
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
#include "distributed/citus_nodes.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/relation_restriction_equivalence.h"
|
||||
#include "nodes/nodes.h"
|
||||
#include "nodes/primnodes.h"
|
||||
#include "nodes/parsenodes.h"
|
||||
|
@ -180,7 +181,9 @@ extern bool SubqueryPushdown;
|
|||
|
||||
|
||||
/* Function declarations for building logical plans */
|
||||
extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree);
|
||||
extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree,
|
||||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext);
|
||||
extern bool NeedsDistributedPlanning(Query *queryTree);
|
||||
extern MultiNode * ParentNode(MultiNode *multiNode);
|
||||
extern MultiNode * ChildNode(MultiUnaryNode *multiNode);
|
||||
|
|
|
@ -19,6 +19,7 @@ extern bool ContainsUnionSubquery(Query *queryTree);
|
|||
extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *
|
||||
plannerRestrictionContext);
|
||||
extern bool SafeToPushdownUnionSubquery(RelationRestrictionContext *restrictionContext);
|
||||
extern List * RelationIdList(Query *query);
|
||||
|
||||
|
||||
#endif /* RELATION_RESTRICTION_EQUIVALENCE_H */
|
||||
|
|
|
@ -349,16 +349,14 @@ ERROR: cannot perform local joins that involve expressions
|
|||
DETAIL: local joins can be performed between columns only
|
||||
-- Check that we can issue limit/offset queries
|
||||
-- OFFSET in subqueries are not supported
|
||||
-- Error in the planner when subquery pushdown is off
|
||||
-- Error in the planner when single repartition subquery
|
||||
SELECT * FROM (SELECT o_custkey FROM orders GROUP BY o_custkey ORDER BY o_custkey OFFSET 20) sq;
|
||||
ERROR: cannot perform distributed planning on this query
|
||||
DETAIL: Subqueries with offset are not supported yet
|
||||
-- Error in the optimizer when subquery pushdown is on
|
||||
SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq;
|
||||
ERROR: cannot perform distributed planning on this query
|
||||
DETAIL: Subqueries with offset are not supported yet
|
||||
SET citus.subquery_pushdown TO true;
|
||||
-- Error in the optimizer when subquery pushdown is on
|
||||
SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq;
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: Offset clause is currently unsupported
|
||||
SET citus.subquery_pushdown TO false;
|
||||
-- Simple LIMIT/OFFSET with ORDER BY
|
||||
SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20;
|
||||
o_orderkey
|
||||
|
|
|
@ -864,8 +864,8 @@ SELECT * FROM (
|
|||
(SELECT * FROM articles_hash_mx WHERE author_id = 1)
|
||||
UNION
|
||||
(SELECT * FROM articles_hash_mx WHERE author_id = 2)) uu;
|
||||
ERROR: cannot perform distributed planning on this query
|
||||
DETAIL: Subqueries without group by clause are not supported yet
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: Currently all leaf queries need to have same filters on partition column
|
||||
-- error out for queries with repartition jobs
|
||||
SELECT *
|
||||
FROM articles_hash_mx a, articles_hash_mx b
|
||||
|
|
|
@ -980,8 +980,8 @@ SELECT * FROM (
|
|||
(SELECT * FROM articles_hash WHERE author_id = 1)
|
||||
UNION
|
||||
(SELECT * FROM articles_hash WHERE author_id = 2)) uu;
|
||||
ERROR: cannot perform distributed planning on this query
|
||||
DETAIL: Subqueries without group by clause are not supported yet
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: Currently all leaf queries need to have same filters on partition column
|
||||
-- error out for queries with repartition jobs
|
||||
SELECT *
|
||||
FROM articles_hash a, articles_hash b
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
|
||||
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1430000;
|
||||
-- ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1430000;
|
||||
SET citus.subquery_pushdown TO TRUE;
|
||||
SET citus.enable_router_execution TO FALSE;
|
||||
------------------------------------
|
||||
-- Vanilla funnel query
|
||||
|
@ -1673,5 +1672,4 @@ WHERE b.user_id IS NULL
|
|||
GROUP BY a.user_id;
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: Table expressions other than simple relations and subqueries are currently unsupported
|
||||
SET citus.subquery_pushdown TO FALSE;
|
||||
SET citus.enable_router_execution TO TRUE;
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000;
|
||||
|
||||
SET citus.subquery_pushdown TO TRUE;
|
||||
SET citus.enable_router_execution TO FALSE;
|
||||
--
|
||||
-- UNIONs and JOINs mixed
|
||||
|
@ -2025,8 +2024,8 @@ FROM (
|
|||
) user_id
|
||||
ORDER BY 1,
|
||||
2 limit 10;
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: Window functions are currently unsupported
|
||||
ERROR: cannot perform distributed planning on this query
|
||||
DETAIL: Subqueries without group by clause are not supported yet
|
||||
-- not supported due to non relation rte
|
||||
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
|
||||
FROM
|
||||
|
@ -2113,5 +2112,4 @@ GROUP BY types
|
|||
ORDER BY types;
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: Subqueries without relations are unsupported
|
||||
SET citus.subquery_pushdown TO FALSE;
|
||||
SET citus.enable_router_execution TO TRUE;
|
||||
|
|
|
@ -4,7 +4,6 @@
|
|||
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql
|
||||
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
|
||||
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
|
||||
SET citus.subquery_pushdown TO true;
|
||||
SET citus.enable_router_execution TO false;
|
||||
-- a very simple union query
|
||||
SELECT user_id, counter
|
||||
|
@ -845,5 +844,4 @@ GROUP BY types
|
|||
ORDER BY types;
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: Subqueries without relations are unsupported
|
||||
SET citus.subquery_pushdown TO false;
|
||||
SET citus.enable_router_execution TO true;
|
||||
|
|
|
@ -234,16 +234,33 @@ SET citus.task_executor_type to DEFAULT;
|
|||
-- create a view with aggregate
|
||||
CREATE VIEW lineitems_by_shipping_method AS
|
||||
SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1;
|
||||
-- following will fail due to non-flattening of subquery due to GROUP BY
|
||||
-- following will fail due to non GROUP BY of partition key
|
||||
SELECT * FROM lineitems_by_shipping_method;
|
||||
ERROR: Unrecognized range table id 1
|
||||
-- create a view with group by on partition column
|
||||
CREATE VIEW lineitems_by_orderkey AS
|
||||
SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1;
|
||||
-- this will also fail due to same reason
|
||||
SELECT * FROM lineitems_by_orderkey;
|
||||
ERROR: Unrecognized range table id 1
|
||||
-- however it would work if it is made router plannable
|
||||
SELECT
|
||||
l_orderkey, count(*)
|
||||
FROM
|
||||
lineitem_hash_part
|
||||
GROUP BY 1;
|
||||
-- this should work since we're able to push down this query
|
||||
SELECT * FROM lineitems_by_orderkey ORDER BY 2 DESC, 1 ASC LIMIT 10;
|
||||
l_orderkey | count
|
||||
------------+-------
|
||||
7 | 7
|
||||
68 | 7
|
||||
129 | 7
|
||||
164 | 7
|
||||
194 | 7
|
||||
225 | 7
|
||||
226 | 7
|
||||
322 | 7
|
||||
326 | 7
|
||||
354 | 7
|
||||
(10 rows)
|
||||
|
||||
-- it would also work since it is made router plannable
|
||||
SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100;
|
||||
l_orderkey | count
|
||||
------------+-------
|
||||
|
|
|
@ -131,9 +131,9 @@ SELECT *
|
|||
SELECT *
|
||||
FROM (
|
||||
SELECT
|
||||
l_orderkey, count(DISTINCT 1)
|
||||
l_linenumber, count(DISTINCT 1)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey) sub
|
||||
GROUP BY l_linenumber) sub
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
|
||||
|
@ -141,9 +141,9 @@ SELECT *
|
|||
SELECT *
|
||||
FROM (
|
||||
SELECT
|
||||
l_orderkey, count(DISTINCT (random() * 5)::int)
|
||||
l_linenumber, count(DISTINCT (random() * 5)::int)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey) sub
|
||||
GROUP BY l_linenumber) sub
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
|
||||
|
@ -278,18 +278,18 @@ SELECT *
|
|||
SELECT *
|
||||
FROM (
|
||||
SELECT
|
||||
l_orderkey, sum(DISTINCT l_partkey)
|
||||
l_linenumber, sum(DISTINCT l_partkey)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey) sub
|
||||
GROUP BY l_linenumber) sub
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
|
||||
SELECT *
|
||||
FROM (
|
||||
SELECT
|
||||
l_orderkey, avg(DISTINCT l_partkey)
|
||||
l_linenumber, avg(DISTINCT l_partkey)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey) sub
|
||||
GROUP BY l_linenumber) sub
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
|
||||
|
@ -298,18 +298,18 @@ SELECT *
|
|||
SELECT *
|
||||
FROM (
|
||||
SELECT
|
||||
l_orderkey, count(DISTINCT lineitem_hash)
|
||||
l_linenumber, count(DISTINCT lineitem_hash)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey) sub
|
||||
GROUP BY l_linenumber) sub
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
|
||||
SELECT *
|
||||
FROM (
|
||||
SELECT
|
||||
l_orderkey, count(DISTINCT lineitem_hash.*)
|
||||
l_linenumber, count(DISTINCT lineitem_hash.*)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey) sub
|
||||
GROUP BY l_linenumber) sub
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range')
|
|||
|
||||
SET citus.enable_router_execution TO 'false';
|
||||
|
||||
-- Check that we don't allow subquery pushdown in default settings.
|
||||
-- Check that we allow subquery pushdown in default settings.
|
||||
|
||||
SELECT
|
||||
avg(unit_price)
|
||||
|
@ -63,8 +63,6 @@ FROM
|
|||
GROUP BY
|
||||
l_orderkey) AS unit_prices;
|
||||
|
||||
SET citus.subquery_pushdown to TRUE;
|
||||
|
||||
-- Check that we don't crash if there are not any shards.
|
||||
|
||||
SELECT
|
||||
|
@ -131,7 +129,8 @@ FROM
|
|||
|
||||
UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003;
|
||||
|
||||
-- If group by is not on partition column then we error out.
|
||||
-- If group by is not on partition column then we error out from single table
|
||||
-- repartition code path
|
||||
|
||||
SELECT
|
||||
avg(order_count)
|
||||
|
@ -228,6 +227,8 @@ FROM (
|
|||
count(*) a
|
||||
FROM
|
||||
lineitem_subquery
|
||||
GROUP BY
|
||||
l_orderkey
|
||||
) z;
|
||||
|
||||
-- Check supported subquery types.
|
||||
|
@ -338,7 +339,6 @@ CREATE TABLE subquery_pruning_varchar_test_table
|
|||
SELECT master_create_distributed_table('subquery_pruning_varchar_test_table', 'a', 'hash');
|
||||
SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1);
|
||||
|
||||
SET citus.subquery_pushdown TO TRUE;
|
||||
SET client_min_messages TO DEBUG2;
|
||||
|
||||
SELECT * FROM
|
||||
|
@ -926,8 +926,6 @@ WHERE shardid = :new_shard_id;
|
|||
\.
|
||||
|
||||
|
||||
SET citus.subquery_pushdown TO TRUE;
|
||||
|
||||
-- Simple join subquery pushdown
|
||||
SELECT
|
||||
avg(array_length(events, 1)) AS event_average
|
||||
|
|
|
@ -242,9 +242,9 @@ SELECT *
|
|||
SELECT *
|
||||
FROM (
|
||||
SELECT
|
||||
l_orderkey, count(DISTINCT 1)
|
||||
l_linenumber, count(DISTINCT 1)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey) sub
|
||||
GROUP BY l_linenumber) sub
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
ERROR: cannot compute aggregate (distinct)
|
||||
|
@ -254,9 +254,9 @@ HINT: You can load the hll extension from contrib packages and enable distinct
|
|||
SELECT *
|
||||
FROM (
|
||||
SELECT
|
||||
l_orderkey, count(DISTINCT (random() * 5)::int)
|
||||
l_linenumber, count(DISTINCT (random() * 5)::int)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey) sub
|
||||
GROUP BY l_linenumber) sub
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
ERROR: cannot compute aggregate (distinct)
|
||||
|
@ -465,9 +465,9 @@ SELECT *
|
|||
SELECT *
|
||||
FROM (
|
||||
SELECT
|
||||
l_orderkey, sum(DISTINCT l_partkey)
|
||||
l_linenumber, sum(DISTINCT l_partkey)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey) sub
|
||||
GROUP BY l_linenumber) sub
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
ERROR: cannot compute aggregate (distinct)
|
||||
|
@ -475,9 +475,9 @@ DETAIL: Only count(distinct) aggregate is supported in subqueries
|
|||
SELECT *
|
||||
FROM (
|
||||
SELECT
|
||||
l_orderkey, avg(DISTINCT l_partkey)
|
||||
l_linenumber, avg(DISTINCT l_partkey)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey) sub
|
||||
GROUP BY l_linenumber) sub
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
ERROR: cannot compute aggregate (distinct)
|
||||
|
@ -487,9 +487,9 @@ DETAIL: Only count(distinct) aggregate is supported in subqueries
|
|||
SELECT *
|
||||
FROM (
|
||||
SELECT
|
||||
l_orderkey, count(DISTINCT lineitem_hash)
|
||||
l_linenumber, count(DISTINCT lineitem_hash)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey) sub
|
||||
GROUP BY l_linenumber) sub
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
ERROR: cannot compute count (distinct)
|
||||
|
@ -497,9 +497,9 @@ DETAIL: Non-column references are not supported yet
|
|||
SELECT *
|
||||
FROM (
|
||||
SELECT
|
||||
l_orderkey, count(DISTINCT lineitem_hash.*)
|
||||
l_linenumber, count(DISTINCT lineitem_hash.*)
|
||||
FROM lineitem_hash
|
||||
GROUP BY l_orderkey) sub
|
||||
GROUP BY l_linenumber) sub
|
||||
ORDER BY 2 DESC, 1 DESC
|
||||
LIMIT 10;
|
||||
ERROR: cannot compute count (distinct)
|
||||
|
|
|
@ -54,7 +54,7 @@ SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range')
|
|||
(1 row)
|
||||
|
||||
SET citus.enable_router_execution TO 'false';
|
||||
-- Check that we don't allow subquery pushdown in default settings.
|
||||
-- Check that we allow subquery pushdown in default settings.
|
||||
SELECT
|
||||
avg(unit_price)
|
||||
FROM
|
||||
|
@ -68,9 +68,11 @@ FROM
|
|||
l_orderkey = o_orderkey
|
||||
GROUP BY
|
||||
l_orderkey) AS unit_prices;
|
||||
ERROR: cannot perform distributed planning on this query
|
||||
DETAIL: Join in subqueries is not supported yet
|
||||
SET citus.subquery_pushdown to TRUE;
|
||||
avg
|
||||
-----
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Check that we don't crash if there are not any shards.
|
||||
SELECT
|
||||
avg(unit_price)
|
||||
|
@ -130,7 +132,8 @@ ERROR: cannot push down this subquery
|
|||
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
|
||||
-- Update metadata in order to make all shards equal.
|
||||
UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003;
|
||||
-- If group by is not on partition column then we error out.
|
||||
-- If group by is not on partition column then we error out from single table
|
||||
-- repartition code path
|
||||
SELECT
|
||||
avg(order_count)
|
||||
FROM
|
||||
|
@ -141,8 +144,8 @@ FROM
|
|||
lineitem_subquery
|
||||
GROUP BY
|
||||
l_suppkey) AS order_counts;
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: Group by list without partition column is currently unsupported
|
||||
ERROR: cannot use real time executor with repartition jobs
|
||||
HINT: Set citus.task_executor_type to "task-tracker".
|
||||
-- Check that we error out if join is not on partition columns.
|
||||
SELECT
|
||||
avg(unit_price)
|
||||
|
@ -224,6 +227,8 @@ FROM (
|
|||
count(*) a
|
||||
FROM
|
||||
lineitem_subquery
|
||||
GROUP BY
|
||||
l_orderkey
|
||||
) z;
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: distinct in the outermost query is unsupported
|
||||
|
@ -358,7 +363,6 @@ SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1);
|
|||
|
||||
(1 row)
|
||||
|
||||
SET citus.subquery_pushdown TO TRUE;
|
||||
SET client_min_messages TO DEBUG2;
|
||||
SELECT * FROM
|
||||
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a)
|
||||
|
@ -891,7 +895,6 @@ SELECT master_create_empty_shard('users') AS new_shard_id
|
|||
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
|
||||
WHERE shardid = :new_shard_id;
|
||||
\COPY users FROM STDIN WITH CSV
|
||||
SET citus.subquery_pushdown TO TRUE;
|
||||
-- Simple join subquery pushdown
|
||||
SELECT
|
||||
avg(array_length(events, 1)) AS event_average
|
||||
|
|
|
@ -54,7 +54,7 @@ SELECT master_create_distributed_table('orders_subquery', 'o_orderkey', 'range')
|
|||
(1 row)
|
||||
|
||||
SET citus.enable_router_execution TO 'false';
|
||||
-- Check that we don't allow subquery pushdown in default settings.
|
||||
-- Check that we allow subquery pushdown in default settings.
|
||||
SELECT
|
||||
avg(unit_price)
|
||||
FROM
|
||||
|
@ -68,9 +68,11 @@ FROM
|
|||
l_orderkey = o_orderkey
|
||||
GROUP BY
|
||||
l_orderkey) AS unit_prices;
|
||||
ERROR: cannot perform distributed planning on this query
|
||||
DETAIL: Join in subqueries is not supported yet
|
||||
SET citus.subquery_pushdown to TRUE;
|
||||
avg
|
||||
-----
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Check that we don't crash if there are not any shards.
|
||||
SELECT
|
||||
avg(unit_price)
|
||||
|
@ -130,7 +132,8 @@ ERROR: cannot push down this subquery
|
|||
DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
|
||||
-- Update metadata in order to make all shards equal.
|
||||
UPDATE pg_dist_shard SET shardmaxvalue = '14947' WHERE shardid = 270003;
|
||||
-- If group by is not on partition column then we error out.
|
||||
-- If group by is not on partition column then we error out from single table
|
||||
-- repartition code path
|
||||
SELECT
|
||||
avg(order_count)
|
||||
FROM
|
||||
|
@ -141,8 +144,8 @@ FROM
|
|||
lineitem_subquery
|
||||
GROUP BY
|
||||
l_suppkey) AS order_counts;
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: Group by list without partition column is currently unsupported
|
||||
ERROR: cannot use real time executor with repartition jobs
|
||||
HINT: Set citus.task_executor_type to "task-tracker".
|
||||
-- Check that we error out if join is not on partition columns.
|
||||
SELECT
|
||||
avg(unit_price)
|
||||
|
@ -224,6 +227,8 @@ FROM (
|
|||
count(*) a
|
||||
FROM
|
||||
lineitem_subquery
|
||||
GROUP BY
|
||||
l_orderkey
|
||||
) z;
|
||||
ERROR: cannot push down this subquery
|
||||
DETAIL: distinct in the outermost query is unsupported
|
||||
|
@ -358,7 +363,6 @@ SELECT master_create_worker_shards('subquery_pruning_varchar_test_table', 4, 1);
|
|||
|
||||
(1 row)
|
||||
|
||||
SET citus.subquery_pushdown TO TRUE;
|
||||
SET client_min_messages TO DEBUG2;
|
||||
SELECT * FROM
|
||||
(SELECT count(*) FROM subquery_pruning_varchar_test_table WHERE a = 'onder' GROUP BY a)
|
||||
|
@ -891,7 +895,6 @@ SELECT master_create_empty_shard('users') AS new_shard_id
|
|||
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
|
||||
WHERE shardid = :new_shard_id;
|
||||
\COPY users FROM STDIN WITH CSV
|
||||
SET citus.subquery_pushdown TO TRUE;
|
||||
-- Simple join subquery pushdown
|
||||
SELECT
|
||||
avg(array_length(events, 1)) AS event_average
|
||||
|
|
|
@ -162,13 +162,11 @@ SELECT count(*) FROM lineitem, orders WHERE l_orderkey + 1 = o_orderkey;
|
|||
-- Check that we can issue limit/offset queries
|
||||
|
||||
-- OFFSET in subqueries are not supported
|
||||
-- Error in the planner when subquery pushdown is off
|
||||
SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq;
|
||||
SET citus.subquery_pushdown TO true;
|
||||
-- Error in the planner when single repartition subquery
|
||||
SELECT * FROM (SELECT o_custkey FROM orders GROUP BY o_custkey ORDER BY o_custkey OFFSET 20) sq;
|
||||
|
||||
-- Error in the optimizer when subquery pushdown is on
|
||||
SELECT * FROM (SELECT o_orderkey FROM orders ORDER BY o_orderkey OFFSET 20) sq;
|
||||
SET citus.subquery_pushdown TO false;
|
||||
|
||||
-- Simple LIMIT/OFFSET with ORDER BY
|
||||
SELECT o_orderkey FROM orders ORDER BY o_orderkey LIMIT 10 OFFSET 20;
|
||||
|
|
|
@ -8,7 +8,6 @@
|
|||
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1430000;
|
||||
-- ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1430000;
|
||||
|
||||
SET citus.subquery_pushdown TO TRUE;
|
||||
SET citus.enable_router_execution TO FALSE;
|
||||
|
||||
------------------------------------
|
||||
|
@ -1346,5 +1345,4 @@ FROM (SELECT
|
|||
WHERE b.user_id IS NULL
|
||||
GROUP BY a.user_id;
|
||||
|
||||
SET citus.subquery_pushdown TO FALSE;
|
||||
SET citus.enable_router_execution TO TRUE;
|
||||
|
|
|
@ -8,7 +8,6 @@
|
|||
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000;
|
||||
|
||||
SET citus.subquery_pushdown TO TRUE;
|
||||
SET citus.enable_router_execution TO FALSE;
|
||||
|
||||
--
|
||||
|
@ -1870,5 +1869,4 @@ INNER JOIN
|
|||
GROUP BY types
|
||||
ORDER BY types;
|
||||
|
||||
SET citus.subquery_pushdown TO FALSE;
|
||||
SET citus.enable_router_execution TO TRUE;
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
|
||||
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
|
||||
|
||||
SET citus.subquery_pushdown TO true;
|
||||
SET citus.enable_router_execution TO false;
|
||||
-- a very simple union query
|
||||
SELECT user_id, counter
|
||||
|
@ -673,5 +672,4 @@ FROM
|
|||
GROUP BY types
|
||||
ORDER BY types;
|
||||
|
||||
SET citus.subquery_pushdown TO false;
|
||||
SET citus.enable_router_execution TO true;
|
||||
|
|
|
@ -123,17 +123,21 @@ SET citus.task_executor_type to DEFAULT;
|
|||
CREATE VIEW lineitems_by_shipping_method AS
|
||||
SELECT l_shipmode, count(*) as cnt FROM lineitem_hash_part GROUP BY 1;
|
||||
|
||||
-- following will fail due to non-flattening of subquery due to GROUP BY
|
||||
-- following will fail due to non GROUP BY of partition key
|
||||
SELECT * FROM lineitems_by_shipping_method;
|
||||
|
||||
-- create a view with group by on partition column
|
||||
CREATE VIEW lineitems_by_orderkey AS
|
||||
SELECT l_orderkey, count(*) FROM lineitem_hash_part GROUP BY 1;
|
||||
SELECT
|
||||
l_orderkey, count(*)
|
||||
FROM
|
||||
lineitem_hash_part
|
||||
GROUP BY 1;
|
||||
|
||||
-- this will also fail due to same reason
|
||||
SELECT * FROM lineitems_by_orderkey;
|
||||
-- this should work since we're able to push down this query
|
||||
SELECT * FROM lineitems_by_orderkey ORDER BY 2 DESC, 1 ASC LIMIT 10;
|
||||
|
||||
-- however it would work if it is made router plannable
|
||||
-- it would also work since it is made router plannable
|
||||
SELECT * FROM lineitems_by_orderkey WHERE l_orderkey = 100;
|
||||
|
||||
DROP TABLE temp_lineitem CASCADE;
|
||||
|
|
Loading…
Reference in New Issue