Fix union pushdown issue (#5079)

* Fix UNION not being pushdown

Postgres optimizes column fields that are not needed in the output. We
were relying on these fields to understand if it is safe to push down a
union query.

This fix looks at the parse query, which has the original column fields
to detect if it is safe to push down a union query.

* Add more tests

* Simplify code and make it more robust

* Process varlevelsup > 0 in FindReferencedTableColumn

* Only look for outers vars in union path

* Add more comments

* Remove UNION ALL specific logic for pulling up childvars
pull/5146/head
SaitTalhaNisanci 2021-07-29 13:52:55 +03:00 committed by GitHub
parent 2aa67421a7
commit 4559d02c41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1201 additions and 104 deletions

View File

@ -1136,7 +1136,8 @@ HasUnsupportedDistinctOn(Query *query)
TargetEntry *distinctEntry = get_sortgroupclause_tle(distinctClause,
query->targetList);
if (IsPartitionColumn(distinctEntry->expr, query))
bool skipOuterVars = true;
if (IsPartitionColumn(distinctEntry->expr, query, skipOuterVars))
{
return false;
}
@ -1170,7 +1171,6 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
List *insertTargetEntryColumnList = pull_var_clause_default((Node *) targetEntry);
Oid subqueryPartitionColumnRelationId = InvalidOid;
Var *subqueryPartitionColumn = NULL;
/*
@ -1202,11 +1202,18 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
insertVar->varattno - 1);
Expr *selectTargetExpr = subqueryTargetEntry->expr;
RangeTblEntry *subqueryPartitionColumnRelationIdRTE = NULL;
List *parentQueryList = list_make2(query, subquery);
bool skipOuterVars = true;
FindReferencedTableColumn(selectTargetExpr,
parentQueryList, subquery,
&subqueryPartitionColumnRelationId,
&subqueryPartitionColumn);
&subqueryPartitionColumn,
&subqueryPartitionColumnRelationIdRTE,
skipOuterVars);
Oid subqueryPartitionColumnRelationId = subqueryPartitionColumnRelationIdRTE ?
subqueryPartitionColumnRelationIdRTE->
relid :
InvalidOid;
/*
* Corresponding (i.e., in the same ordinal position as the target table's
@ -1339,7 +1346,7 @@ InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
}
/* finally, check that the select target column is a partition column */
if (!IsPartitionColumn(selectTargetExpr, subquery))
if (!IsPartitionColumn(selectTargetExpr, subquery, skipOuterVars))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot perform distributed INSERT INTO ... SELECT "

View File

@ -327,7 +327,7 @@ static bool ShouldProcessDistinctOrderAndLimitForWorker(
ExtendedOpNodeProperties *extendedOpNodeProperties,
bool pushingDownOriginalGrouping,
Node *havingQual);
static bool IsIndexInRange(const List *list, int index);
/*
* MultiLogicalPlanOptimize applies multi-relational algebra optimizations on
@ -4372,16 +4372,19 @@ GroupTargetEntryList(List *groupClauseList, List *targetEntryList)
* reference tables do not have partition column. The function does not
* support queries with CTEs, it would return false if columnExpression
* refers to a column returned by a CTE.
*
* If skipOuterVars is true, then it doesn't process the outervars.
*/
bool
IsPartitionColumn(Expr *columnExpression, Query *query)
IsPartitionColumn(Expr *columnExpression, Query *query, bool skipOuterVars)
{
bool isPartitionColumn = false;
Oid relationId = InvalidOid;
Var *column = NULL;
RangeTblEntry *relationRTE = NULL;
FindReferencedTableColumn(columnExpression, NIL, query, &relationId, &column);
FindReferencedTableColumn(columnExpression, NIL, query, &column, &relationRTE,
skipOuterVars);
Oid relationId = relationRTE ? relationRTE->relid : InvalidOid;
if (relationId != InvalidOid && column != NULL)
{
Var *partitionColumn = DistPartitionKey(relationId);
@ -4400,21 +4403,26 @@ IsPartitionColumn(Expr *columnExpression, Query *query)
/*
* FindReferencedTableColumn recursively traverses query tree to find actual relation
* id, and column that columnExpression refers to. If columnExpression is a
* non-relational or computed/derived expression, the function returns InvalidOid for
* relationId and NULL for column. The caller should provide parent query list from
* non-relational or computed/derived expression, the function returns NULL for
* rte and NULL for column. The caller should provide parent query list from
* top of the tree to this particular Query's parent. This argument is used to look
* into CTEs that may be present in the query.
*
* If skipOuterVars is true, then it doesn't check vars coming from outer queries.
* We probably don't need this skipOuterVars check but we wanted to be on the safe side
* and used it only in UNION path, we can separately work on verifying that it doesn't break
* anything existing.
*/
void
FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *query,
Oid *relationId, Var **column)
Var **column, RangeTblEntry **rteContainingReferencedColumn,
bool skipOuterVars)
{
Var *candidateColumn = NULL;
List *rangetableList = query->rtable;
Expr *strippedColumnExpression = (Expr *) strip_implicit_coercions(
(Node *) columnExpression);
*relationId = InvalidOid;
*rteContainingReferencedColumn = NULL;
*column = NULL;
if (IsA(strippedColumnExpression, Var))
@ -4443,9 +4451,28 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
* subqueries in WHERE clause, we don't support use of partition keys
* in the subquery that is referred from the outer query.
*/
if (candidateColumn->varlevelsup > 0)
if (candidateColumn->varlevelsup > 0 && !skipOuterVars)
{
return;
int parentQueryIndex = list_length(parentQueryList) -
candidateColumn->varlevelsup;
if (!(IsIndexInRange(parentQueryList, parentQueryIndex)))
{
return;
}
/*
* Before we recurse into the query tree, we should update the candidateColumn and we use copy of it.
* As we get the query from varlevelsup up, we reset the varlevelsup.
*/
candidateColumn = copyObject(candidateColumn);
candidateColumn->varlevelsup = 0;
/*
* We should be careful about these fields because they need to
* be updated correctly based on ctelevelsup and varlevelsup.
*/
query = list_nth(parentQueryList, parentQueryIndex);
parentQueryList = list_truncate(parentQueryList, parentQueryIndex);
}
if (candidateColumn->varattno == InvalidAttrNumber)
@ -4457,12 +4484,13 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
return;
}
List *rangetableList = query->rtable;
int rangeTableEntryIndex = candidateColumn->varno - 1;
RangeTblEntry *rangeTableEntry = list_nth(rangetableList, rangeTableEntryIndex);
if (rangeTableEntry->rtekind == RTE_RELATION)
{
*relationId = rangeTableEntry->relid;
*rteContainingReferencedColumn = rangeTableEntry;
*column = candidateColumn;
}
else if (rangeTableEntry->rtekind == RTE_SUBQUERY)
@ -4476,7 +4504,8 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
/* append current query to parent query list */
parentQueryList = lappend(parentQueryList, query);
FindReferencedTableColumn(subColumnExpression, parentQueryList,
subquery, relationId, column);
subquery, column, rteContainingReferencedColumn,
skipOuterVars);
}
else if (rangeTableEntry->rtekind == RTE_JOIN)
{
@ -4485,11 +4514,17 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
Expr *joinColumn = list_nth(joinColumnList, joinColumnIndex);
/* parent query list stays the same since still in the same query boundary */
FindReferencedTableColumn(joinColumn, parentQueryList, query,
relationId, column);
FindReferencedTableColumn(joinColumn, parentQueryList, query, column,
rteContainingReferencedColumn, skipOuterVars);
}
else if (rangeTableEntry->rtekind == RTE_CTE)
{
/*
* When outerVars are considered, we modify parentQueryList, so this
* logic might need to change when we support outervars in CTEs.
*/
Assert(!skipOuterVars);
int cteParentListIndex = list_length(parentQueryList) -
rangeTableEntry->ctelevelsup - 1;
Query *cteParentQuery = NULL;
@ -4501,7 +4536,7 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
* moment due to usage from IsPartitionColumn. Callers of that function
* do not have access to parent query list.
*/
if (cteParentListIndex >= 0)
if (IsIndexInRange(parentQueryList, cteParentListIndex))
{
cteParentQuery = list_nth(parentQueryList, cteParentListIndex);
cteList = cteParentQuery->cteList;
@ -4526,12 +4561,24 @@ FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList, Query *
parentQueryList = lappend(parentQueryList, query);
FindReferencedTableColumn(targetEntry->expr, parentQueryList,
cteQuery, relationId, column);
cteQuery, column, rteContainingReferencedColumn,
skipOuterVars);
}
}
}
/*
* IsIndexInRange returns true if the given index is within the
* range of the given list.
*/
static bool
IsIndexInRange(const List *list, int index)
{
return index >= 0 && index < list_length(list);
}
/*
* ExtractQueryWalker walks over a query, and finds all queries in the query
* tree and returns these queries. Note that the function also recurses into

View File

@ -215,11 +215,15 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
Expr *targetExpression = targetEntry->expr;
bool isPartitionColumn = IsPartitionColumn(targetExpression, query);
Oid relationId = InvalidOid;
bool skipOuterVars = true;
bool isPartitionColumn = IsPartitionColumn(targetExpression, query,
skipOuterVars);
Var *column = NULL;
RangeTblEntry *rte = NULL;
FindReferencedTableColumn(targetExpression, NIL, query, &relationId, &column);
FindReferencedTableColumn(targetExpression, NIL, query, &column, &rte,
skipOuterVars);
Oid relationId = rte ? rte->relid : InvalidOid;
/*
* If the expression belongs to a non-distributed table continue searching for

View File

@ -358,7 +358,8 @@ AddPartitionKeyNotNullFilterToSelect(Query *subqery)
{
TargetEntry *targetEntry = lfirst(targetEntryCell);
if (IsPartitionColumn(targetEntry->expr, subqery) &&
bool skipOuterVars = true;
if (IsPartitionColumn(targetEntry->expr, subqery, skipOuterVars) &&
IsA(targetEntry->expr, Var))
{
targetPartitionColumnVar = (Var *) targetEntry->expr;

View File

@ -582,7 +582,7 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
*/
if (ContainsUnionSubquery(originalQuery))
{
if (!SafeToPushdownUnionSubquery(plannerRestrictionContext))
if (!SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot pushdown the subquery since not all subqueries "
@ -1836,7 +1836,9 @@ PartitionColumnForPushedDownSubquery(Query *query)
Expr *targetExpression = targetEntry->expr;
if (IsA(targetExpression, Var))
{
bool isPartitionColumn = IsPartitionColumn(targetExpression, query);
bool skipOuterVars = true;
bool isPartitionColumn = IsPartitionColumn(targetExpression, query,
skipOuterVars);
if (isPartitionColumn)
{
Var *partitionColumn = copyObject((Var *) targetExpression);

View File

@ -1013,7 +1013,7 @@ ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *contex
PlannerRestrictionContext *filteredRestrictionContext =
FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query);
if (!SafeToPushdownUnionSubquery(filteredRestrictionContext))
if (!SafeToPushdownUnionSubquery(query, filteredRestrictionContext))
{
/*
* The distribution column is not in the same place in all sides

View File

@ -59,6 +59,12 @@ typedef struct AttributeEquivalenceClass
Index unionQueryPartitionKeyIndex;
} AttributeEquivalenceClass;
typedef struct FindQueryContainingRteIdentityContext
{
int targetRTEIdentity;
Query *query;
}FindQueryContainingRteIdentityContext;
/*
* AttributeEquivalenceClassMember - one member expression of an
* AttributeEquivalenceClass. The important thing to consider is that
@ -142,8 +148,8 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass
firstClass,
AttributeEquivalenceClass *
secondClass);
static Index RelationRestrictionPartitionKeyIndex(RelationRestriction *
relationRestriction);
static Var * PartitionKeyForRTEIdentityInQuery(Query *query, int targetRTEIndex,
Index *partitionKeyIndex);
static bool AllRelationsInRestrictionContextColocated(RelationRestrictionContext *
restrictionContext);
static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node);
@ -155,6 +161,12 @@ static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEn
queryRteIdentities);
static Relids QueryRteIdentities(Query *queryTree);
static Query * FindQueryContainingRTEIdentity(Query *mainQuery, int rteIndex);
static bool FindQueryContainingRTEIdentityInternal(Node *node,
FindQueryContainingRteIdentityContext *
context);
#if PG_VERSION_NUM >= PG_VERSION_13
static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo);
#endif
@ -194,7 +206,7 @@ AllDistributionKeysInQueryAreEqual(Query *originalQuery,
if (originalQuery->setOperations || ContainsUnionSubquery(originalQuery))
{
return SafeToPushdownUnionSubquery(plannerRestrictionContext);
return SafeToPushdownUnionSubquery(originalQuery, plannerRestrictionContext);
}
return false;
@ -244,7 +256,8 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext)
* safe to push down, the function would fail to return true.
*/
bool
SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext)
SafeToPushdownUnionSubquery(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
RelationRestrictionContext *restrictionContext =
plannerRestrictionContext->relationRestrictionContext;
@ -267,50 +280,34 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
Index partitionKeyIndex = InvalidAttrNumber;
PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo;
List *targetList = relationPlannerRoot->parse->targetList;
List *appendRelList = relationPlannerRoot->append_rel_list;
Var *varToBeAdded = NULL;
TargetEntry *targetEntryToAdd = NULL;
int targetRTEIndex = GetRTEIdentity(relationRestriction->rte);
Var *varToBeAdded =
PartitionKeyForRTEIdentityInQuery(originalQuery, targetRTEIndex,
&partitionKeyIndex);
/* union does not have partition key in the target list */
if (partitionKeyIndex == 0)
{
continue;
}
/*
* We first check whether UNION ALLs are pulled up or not. Note that Postgres
* planner creates AppendRelInfos per each UNION ALL query that is pulled up.
* Then, postgres stores the related information in the append_rel_list on the
* plannerInfo struct.
* This should never happen but to be on the safe side, we have this
*/
if (appendRelList != NULL)
if (relationPlannerRoot->simple_rel_array_size < relationRestriction->index)
{
varToBeAdded = FindUnionAllVar(relationPlannerRoot,
relationRestriction->translatedVars,
relationRestriction->relationId,
relationRestriction->index,
&partitionKeyIndex);
/* union does not have partition key in the target list */
if (partitionKeyIndex == 0)
{
continue;
}
continue;
}
else
{
partitionKeyIndex =
RelationRestrictionPartitionKeyIndex(relationRestriction);
/* union does not have partition key in the target list */
if (partitionKeyIndex == 0)
{
continue;
}
/*
* We update the varno because we use the original parse tree for finding the
* var. However the rest of the code relies on a query tree that might be different
* than the original parse tree because of postgres optimizations.
* That's why we update the varno to reflect the rteIndex in the modified query tree.
*/
varToBeAdded->varno = relationRestriction->index;
targetEntryToAdd = list_nth(targetList, partitionKeyIndex - 1);
if (!IsA(targetEntryToAdd->expr, Var))
{
continue;
}
varToBeAdded = (Var *) targetEntryToAdd->expr;
}
/*
* The current relation does not have its partition key in the target list.
@ -1756,20 +1753,42 @@ ContainsUnionSubquery(Query *queryTree)
/*
* RelationRestrictionPartitionKeyIndex gets a relation restriction and finds the
* index that the partition key of the relation exists in the query. The query is
* found in the planner info of the relation restriction.
* PartitionKeyForRTEIdentityInQuery finds the partition key var(if exists),
* in the given original query for the rte that has targetRTEIndex.
*/
static Index
RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction)
static Var *
PartitionKeyForRTEIdentityInQuery(Query *originalQuery, int targetRTEIndex,
Index *partitionKeyIndex)
{
Query *originalQueryContainingRTEIdentity =
FindQueryContainingRTEIdentity(originalQuery, targetRTEIndex);
if (!originalQueryContainingRTEIdentity)
{
/*
* We should always find the query but we have this check for sanity.
* This check makes sure that if there is a bug while finding the query,
* we don't get a crash etc. and the only downside will be we might be recursively
* planning a query that could be pushed down.
*/
return NULL;
}
/*
* This approach fails to detect when
* the top level query might have the column indexes in different order:
* explain
* SELECT count(*) FROM
* (
* SELECT user_id,value_2 FROM events_table
* UNION
* SELECT value_2, user_id FROM (SELECT user_id, value_2, random() FROM events_table) as foo
* ) foobar;
* So we hit https://github.com/citusdata/citus/issues/5093.
*/
List *relationTargetList = originalQueryContainingRTEIdentity->targetList;
ListCell *targetEntryCell = NULL;
Index partitionKeyTargetAttrIndex = 0;
PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo;
Query *relationPlannerParseQuery = relationPlannerRoot->parse;
List *relationTargetList = relationPlannerParseQuery->targetList;
foreach(targetEntryCell, relationTargetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
@ -1777,20 +1796,93 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction)
partitionKeyTargetAttrIndex++;
bool skipOuterVars = false;
if (!targetEntry->resjunk &&
IsA(targetExpression, Var) &&
IsPartitionColumn(targetExpression, relationPlannerParseQuery))
IsPartitionColumn(targetExpression, originalQueryContainingRTEIdentity,
skipOuterVars))
{
Var *targetColumn = (Var *) targetExpression;
if (targetColumn->varno == relationRestriction->index)
/*
* We find the referenced table column to support distribution
* columns that are correlated.
*/
RangeTblEntry *rteContainingPartitionKey = NULL;
FindReferencedTableColumn(targetExpression, NIL,
originalQueryContainingRTEIdentity,
&targetColumn,
&rteContainingPartitionKey,
skipOuterVars);
if (rteContainingPartitionKey->rtekind == RTE_RELATION &&
GetRTEIdentity(rteContainingPartitionKey) == targetRTEIndex)
{
return partitionKeyTargetAttrIndex;
*partitionKeyIndex = partitionKeyTargetAttrIndex;
return (Var *) copyObject(targetColumn);
}
}
}
return InvalidAttrNumber;
return NULL;
}
/*
* FindQueryContainingRTEIdentity finds the query/subquery that has an RTE
* with rteIndex in its rtable.
*/
static Query *
FindQueryContainingRTEIdentity(Query *query, int rteIndex)
{
FindQueryContainingRteIdentityContext *findRteIdentityContext =
palloc0(sizeof(FindQueryContainingRteIdentityContext));
findRteIdentityContext->targetRTEIdentity = rteIndex;
FindQueryContainingRTEIdentityInternal((Node *) query, findRteIdentityContext);
return findRteIdentityContext->query;
}
/*
* FindQueryContainingRTEIdentityInternal walks on the given node to find a query
* which has an RTE that has a given rteIdentity.
*/
static bool
FindQueryContainingRTEIdentityInternal(Node *node,
FindQueryContainingRteIdentityContext *context)
{
if (node == NULL)
{
return false;
}
if (IsA(node, Query))
{
Query *query = (Query *) node;
Query *parentQuery = context->query;
context->query = query;
if (query_tree_walker(query, FindQueryContainingRTEIdentityInternal, context,
QTW_EXAMINE_RTES_BEFORE))
{
return true;
}
context->query = parentQuery;
return false;
}
if (!IsA(node, RangeTblEntry))
{
return expression_tree_walker(node, FindQueryContainingRTEIdentityInternal,
context);
}
RangeTblEntry *rte = (RangeTblEntry *) node;
if (rte->rtekind == RTE_RELATION)
{
if (GetRTEIdentity(rte) == context->targetRTEIdentity)
{
return true;
}
}
return false;
}

View File

@ -171,9 +171,11 @@ extern bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column
extern List * SubqueryMultiTableList(MultiNode *multiNode);
extern List * GroupTargetEntryList(List *groupClauseList, List *targetEntryList);
extern bool ExtractQueryWalker(Node *node, List **queryList);
extern bool IsPartitionColumn(Expr *columnExpression, Query *query);
extern bool IsPartitionColumn(Expr *columnExpression, Query *query, bool skipOuterVars);
extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList,
Query *query, Oid *relationId, Var **column);
Query *query, Var **column,
RangeTblEntry **rteContainingReferencedColumn,
bool skipOuterVars);
extern char * WorkerColumnName(AttrNumber resno);
extern bool IsGroupBySubsetOfDistinct(List *groupClauses, List *distinctClauses);
extern bool TargetListHasAggregates(List *targetEntryList);

View File

@ -20,7 +20,7 @@
extern bool AllDistributionKeysInQueryAreEqual(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
extern bool SafeToPushdownUnionSubquery(PlannerRestrictionContext *
extern bool SafeToPushdownUnionSubquery(Query *originalQuery, PlannerRestrictionContext *
plannerRestrictionContext);
extern bool ContainsUnionSubquery(Query *queryTree);
extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *

View File

@ -740,10 +740,7 @@ DEBUG: Wrapping relation "local" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true
DEBUG: Wrapping relation "local" to a subquery
DEBUG: generating subplan XXX_2 for subquery SELECT id FROM local_dist_join_mixed.local WHERE true
DEBUG: generating subplan XXX_3 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo
DEBUG: generating subplan XXX_4 for subquery SELECT id, name, created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba
DEBUG: generating subplan XXX_5 for subquery SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone) UNION ALL SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.id, intermediate_result.name, intermediate_result.created_at FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(id bigint, name text, created_at timestamp with time zone)) bar
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT fo.id, fo.name, fo.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) fo UNION ALL SELECT ba.id, ba.name, ba.created_at FROM (SELECT distributed.id, distributed.name, distributed.created_at FROM ((SELECT NULL::integer AS "dummy-1", local_1.id, NULL::integer AS "dummy-3", NULL::text AS title, NULL::integer AS "dummy-5" FROM (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id bigint)) local_1) local JOIN local_dist_join_mixed.distributed USING (id))) ba) bar
count
---------------------------------------------------------------------
202

View File

@ -9,6 +9,22 @@ AS create_cmd FROM pg_available_extensions()
WHERE name = 'hll'
\gset
:create_cmd;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT symbol_id,
HLL_ADD_AGG(HLL_HASH_BIGINT(event_id)) AS event_hll_hash,
HLL_CARDINALITY(HLL_ADD_AGG(HLL_HASH_BIGINT(event_id))) AS event_n_users
FROM (
SELECT event_time, composite_id, event_id, 4640476 symbol_id FROM "events"
UNION ALL
SELECT event_time, composite_id, event_id, 4640477 symbol_id FROM "events"
) pushdown_events
GROUP BY symbol_id;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- Try to execute count(distinct) when approximate distincts aren't enabled
SELECT count(distinct l_orderkey) FROM lineitem;

View File

@ -14,6 +14,31 @@ WHERE name = 'hll'
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT symbol_id,
HLL_ADD_AGG(HLL_HASH_BIGINT(event_id)) AS event_hll_hash,
HLL_CARDINALITY(HLL_ADD_AGG(HLL_HASH_BIGINT(event_id))) AS event_n_users
FROM (
SELECT event_time, composite_id, event_id, 4640476 symbol_id FROM "events"
UNION ALL
SELECT event_time, composite_id, event_id, 4640477 symbol_id FROM "events"
) pushdown_events
GROUP BY symbol_id;
$$);
ERROR: function hll_hash_bigint(bigint) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
QUERY:
EXPLAIN SELECT symbol_id,
HLL_ADD_AGG(HLL_HASH_BIGINT(event_id)) AS event_hll_hash,
HLL_CARDINALITY(HLL_ADD_AGG(HLL_HASH_BIGINT(event_id))) AS event_n_users
FROM (
SELECT event_time, composite_id, event_id, 4640476 symbol_id FROM "events"
UNION ALL
SELECT event_time, composite_id, event_id, 4640477 symbol_id FROM "events"
) pushdown_events
GROUP BY symbol_id;
CONTEXT: PL/pgSQL function explain_has_distributed_subplan(text) line XX at FOR over EXECUTE statement
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- Try to execute count(distinct) when approximate distincts aren't enabled
SELECT count(distinct l_orderkey) FROM lineitem;

View File

@ -860,8 +860,10 @@ $$);
(1 row)
-- #4781
CREATE TABLE test_a (id int, k int);
CREATE TABLE test_b (id int, k int);
CREATE TABLE test_a (a int, b int, id int, k int);
CREATE TABLE test_b (a int, b int, id int, k int);
ALTER TABLE test_a DROP column a;
ALTER TABLE test_b DROP column a;
SELECT create_distributed_table('test_a','id');
create_distributed_table
---------------------------------------------------------------------
@ -874,6 +876,123 @@ SELECT create_distributed_table('test_b','id');
(1 row)
-- try with composite types
CREATE TYPE comp_type AS (
int_field_1 BIGINT,
int_field_2 BIGINT
);
CREATE TABLE range_dist_table_2 (dist_col comp_type);
SELECT create_distributed_table('range_dist_table_2', 'dist_col', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL public.create_range_partitioned_shards('range_dist_table_2',
'{"(10,24)","(10,58)",
"(10,90)","(20,100)"}',
'{"(10,25)","(10,65)",
"(10,99)","(20,100)"}');
INSERT INTO range_dist_table_2 VALUES ((10, 24));
INSERT INTO range_dist_table_2 VALUES ((10, 60));
INSERT INTO range_dist_table_2 VALUES ((10, 91));
INSERT INTO range_dist_table_2 VALUES ((20, 100));
-- the following can be pushed down
CREATE OR REPLACE VIEW v2 AS SELECT * from range_dist_table_2 UNION ALL SELECT * from range_dist_table_2;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(dist_col) FROM v2;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v2;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
DROP TABLE range_dist_table_2 cascade;
NOTICE: drop cascades to view v2
-- these should be pushed down.
SELECT public.explain_has_distributed_subplan($$
explain SELECT * FROM users_table_part u1 WHERE (user_id) IN
(
SELECT user_id FROM users_table_part
UNION
SELECT user_id FROM users_table_part
) ;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
explain SELECT * FROM users_table_part u1 WHERE (user_id) IN
(
SELECT user_id FROM (SELECT *, random() FROM users_table_part) as foo
UNION
SELECT user_id FROM (SELECT *, random() FROM users_table_part) as bar
);
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
CREATE OR REPLACE VIEW v AS SELECT * from test_a where k>1 UNION SELECT * from test_b where k<1;
-- tests with union
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
CREATE OR REPLACE VIEW v AS SELECT * from test_a where k>1 UNION ALL SELECT * from test_b where k<1;
-- the followings can be pushed down since dist_key is used in the aggregation
SELECT public.explain_has_distributed_subplan($$
@ -912,14 +1031,13 @@ $$);
f
(1 row)
-- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
t
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
@ -928,7 +1046,7 @@ SELECT AVG(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
t
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
@ -937,18 +1055,397 @@ SELECT SUM(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
t
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT count(*) FROM
(
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
UNION
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
) as bar;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
-- we hit the following error hence can't pushdown:
-- Complex subqueries and CTEs are not supported within a UNION
SELECT public.explain_has_distributed_subplan($$
EXPLAIN WITH cte AS (
SELECT * FROM
(
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
UNION
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
) as bar )
SELECT count(*) FROM
(
SELECT bar.user_id FROM cte u1 JOIN LATERAL (SELECT u1.user_id FROM cte u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
UNION
SELECT bar.user_id FROM cte u1 JOIN LATERAL (SELECT u1.user_id FROM cte u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
) as foo;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
t
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT count(*) FROM
(
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id, random() FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
UNION
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id, random() FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
) as bar;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
explain
SELECT count(*) FROM
(
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u2.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
UNION
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u2.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
) as bar;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
explain
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT *,random() FROM (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as foo) as bar ON (true)
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
SELECT public.explain_has_distributed_subplan($$
EXPLAIN WITH cte_1 AS not materialized
(
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l1
UNION ALL
SELECT user_id
FROM (
SELECT user_id
FROM users_table_part
GROUP BY user_id) AS l2
UNION ALL
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l2
UNION ALL
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l1
UNION
SELECT user_id
FROM (
SELECT user_id
FROM (
SELECT DISTINCT user_id
FROM users_table_part) aa ) AS fooo
UNION
SELECT user_id
FROM (
SELECT DISTINCT user_id
FROM users_table_part) AS l2 ), cte_2 AS NOT materialized
(
SELECT *
FROM cte_1), cte_3 AS NOT materialized
(
SELECT *
FROM cte_2), cte_4 AS
(
SELECT DISTINCT user_id
FROM cte_3)
SELECT count(*)
FROM (
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l1
UNION ALL
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l2
UNION ALL
SELECT user_id
FROM (
SELECT *,
random()
FROM users_table_part) AS l2
UNION ALL
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l1
UNION
SELECT user_id
FROM (
SELECT user_id AS user_id
FROM users_table_part) AS l2
UNION
SELECT user_id
FROM (
SELECT *,
random()
FROM users_table_part) AS l2
UNION
SELECT user_id
FROM (
SELECT *
FROM cte_1) AS bar
UNION ALL
SELECT user_id
FROM cte_2
UNION
SELECT *
FROM (
SELECT user_id
FROM cte_2
GROUP BY user_id) AS bar
UNION
SELECT user_id
FROM cte_4
UNION
SELECT user_id
FROM cte_3 ) AS foo
JOIN lateral
(
SELECT *
FROM (
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l1
UNION ALL
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l2
UNION ALL
SELECT user_id
FROM (
SELECT *,
random()
FROM users_table_part) AS l2
UNION ALL
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l1
UNION
SELECT user_id
FROM (
SELECT user_id AS user_id
FROM users_table_part) AS l2
UNION
SELECT user_id
FROM (
SELECT users_table_part.user_id,
random()
FROM users_table_part) AS l2
UNION
SELECT user_id
FROM (
SELECT *
FROM cte_1) AS bar
UNION ALL
SELECT user_id
FROM cte_2
UNION
SELECT *
FROM (
SELECT user_id
FROM cte_2
GROUP BY user_id) AS bar
UNION
SELECT user_id
FROM cte_3 ) AS subqu) AS bar
using (user_id)
WHERE (
foo.user_id) IN
(
SELECT user_id
FROM (
SELECT foo.user_id
FROM users_table_part u1
JOIN lateral
(
SELECT u1.user_id
FROM users_table_part u2
WHERE u2.user_id = u1.user_id) AS foo
ON (
true)
UNION
SELECT foo.user_id
FROM users_table_part u1
JOIN lateral
(
SELECT u1.user_id
FROM users_table_part u2
WHERE u2.user_id = u1.user_id) AS foo
ON (
true) ) AS bar
UNION
SELECT *
FROM cte_2
WHERE user_id IN
(
SELECT user_id
FROM users_table_part)
UNION
SELECT *
FROM cte_1 ) ;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
-- we hit https://github.com/citusdata/citus/blob/f00c63c33daf3d16f06462626ca14732b141ae7a/src/backend/distributed/planner/relation_restriction_equivalence.c#L235-L242
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT * FROM users_table_part u1 WHERE (value_1, user_id) IN
(
SELECT u1.user_id, user_id FROM users_table_part
UNION
SELECT u1.user_id, user_id FROM users_table_part
);
$$);
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
CONTEXT: PL/pgSQL function public.explain_has_distributed_subplan(text) line XX at FOR over EXECUTE statement
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT * FROM users_table_part u1 WHERE (user_id) IN
(
SELECT foo.user_id FROM users_table_part JOIN users_table_part foo ON users_table_part.user_id = foo.user_id
UNION
SELECT foo.user_id FROM users_table_part JOIN users_table_part foo on users_table_part.user_id = foo.user_id
);
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
CREATE OR REPLACE VIEW v AS SELECT test_a.* from test_a where k>1 UNION ALL SELECT test_b.* from test_b where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT COUNT(*) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT COUNT(*) FROM v;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
f
(1 row)
-- order by prevents postgres from optimizing fields so can be pushed down
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
@ -966,16 +1463,17 @@ SELECT k, COUNT(*) FROM v GROUP BY k ORDER BY k;
$$);
explain_has_distributed_subplan
---------------------------------------------------------------------
t
f
(1 row)
RESET client_min_messages;
DROP SCHEMA union_pushdown CASCADE;
NOTICE: drop cascades to 7 other objects
NOTICE: drop cascades to 8 other objects
DETAIL: drop cascades to table users_table_part
drop cascades to table events_table_part
drop cascades to table events_table_ref
drop cascades to table events_table_local
drop cascades to table test_a
drop cascades to table test_b
drop cascades to type comp_type
drop cascades to view v

View File

@ -14,6 +14,7 @@ test: multi_shard_update_delete recursive_dml_with_different_planners_executors
test: insert_select_repartition window_functions dml_recursive multi_insert_select_window
test: multi_insert_select_conflict citus_table_triggers
test: multi_row_insert insert_select_into_local_table multi_create_table_new_features
test: multi_agg_approximate_distinct
# following should not run in parallel because it relies on connection counts to workers
test: insert_select_connection_leak
@ -65,7 +66,7 @@ test: multi_subquery_in_where_reference_clause adaptive_executor propagate_set_c
# this should be run alone as it gets too many clients
test: join_pushdown
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc statement_cancel_error_message
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql set_role_in_transaction
test: multi_agg_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql set_role_in_transaction
test: multi_reference_table multi_select_for_update relation_access_tracking pg13_with_ties
test: custom_aggregate_support aggregate_support tdigest_aggregate_support
test: multi_average_expression multi_working_columns multi_having_pushdown having_subquery

View File

@ -13,6 +13,18 @@ WHERE name = 'hll'
:create_cmd;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT symbol_id,
HLL_ADD_AGG(HLL_HASH_BIGINT(event_id)) AS event_hll_hash,
HLL_CARDINALITY(HLL_ADD_AGG(HLL_HASH_BIGINT(event_id))) AS event_n_users
FROM (
SELECT event_time, composite_id, event_id, 4640476 symbol_id FROM "events"
UNION ALL
SELECT event_time, composite_id, event_id, 4640477 symbol_id FROM "events"
) pushdown_events
GROUP BY symbol_id;
$$);
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- Try to execute count(distinct) when approximate distincts aren't enabled

View File

@ -655,11 +655,92 @@ LIMIT 1;
$$);
-- #4781
CREATE TABLE test_a (id int, k int);
CREATE TABLE test_b (id int, k int);
CREATE TABLE test_a (a int, b int, id int, k int);
CREATE TABLE test_b (a int, b int, id int, k int);
ALTER TABLE test_a DROP column a;
ALTER TABLE test_b DROP column a;
SELECT create_distributed_table('test_a','id');
SELECT create_distributed_table('test_b','id');
-- try with composite types
CREATE TYPE comp_type AS (
int_field_1 BIGINT,
int_field_2 BIGINT
);
CREATE TABLE range_dist_table_2 (dist_col comp_type);
SELECT create_distributed_table('range_dist_table_2', 'dist_col', 'range');
CALL public.create_range_partitioned_shards('range_dist_table_2',
'{"(10,24)","(10,58)",
"(10,90)","(20,100)"}',
'{"(10,25)","(10,65)",
"(10,99)","(20,100)"}');
INSERT INTO range_dist_table_2 VALUES ((10, 24));
INSERT INTO range_dist_table_2 VALUES ((10, 60));
INSERT INTO range_dist_table_2 VALUES ((10, 91));
INSERT INTO range_dist_table_2 VALUES ((20, 100));
-- the following can be pushed down
CREATE OR REPLACE VIEW v2 AS SELECT * from range_dist_table_2 UNION ALL SELECT * from range_dist_table_2;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(dist_col) FROM v2;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v2;
$$);
DROP TABLE range_dist_table_2 cascade;
-- these should be pushed down.
SELECT public.explain_has_distributed_subplan($$
explain SELECT * FROM users_table_part u1 WHERE (user_id) IN
(
SELECT user_id FROM users_table_part
UNION
SELECT user_id FROM users_table_part
) ;
$$);
SELECT public.explain_has_distributed_subplan($$
explain SELECT * FROM users_table_part u1 WHERE (user_id) IN
(
SELECT user_id FROM (SELECT *, random() FROM users_table_part) as foo
UNION
SELECT user_id FROM (SELECT *, random() FROM users_table_part) as bar
);
$$);
CREATE OR REPLACE VIEW v AS SELECT * from test_a where k>1 UNION SELECT * from test_b where k<1;
-- tests with union
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(k) FROM v;
$$);
CREATE OR REPLACE VIEW v AS SELECT * from test_a where k>1 UNION ALL SELECT * from test_b where k<1;
-- the followings can be pushed down since dist_key is used in the aggregation
SELECT public.explain_has_distributed_subplan($$
@ -682,7 +763,6 @@ EXPLAIN
SELECT MAX(id) FROM v;
$$);
-- cannot pushed down because postgres optimizes fields, needs to be fixed with #4781
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(k) FROM v;
@ -703,6 +783,319 @@ EXPLAIN
SELECT MAX(k) FROM v;
$$);
CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
CREATE OR REPLACE VIEW v AS SELECT * from test_a where k<1 UNION ALL SELECT * from test_b where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT COUNT(*) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT AVG(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT SUM(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT MAX(k) FROM v;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT count(*) FROM
(
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
UNION
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
) as bar;
$$);
-- we hit the following error hence can't pushdown:
-- Complex subqueries and CTEs are not supported within a UNION
SELECT public.explain_has_distributed_subplan($$
EXPLAIN WITH cte AS (
SELECT * FROM
(
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
UNION
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
) as bar )
SELECT count(*) FROM
(
SELECT bar.user_id FROM cte u1 JOIN LATERAL (SELECT u1.user_id FROM cte u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
UNION
SELECT bar.user_id FROM cte u1 JOIN LATERAL (SELECT u1.user_id FROM cte u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
) as foo;
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT count(*) FROM
(
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id, random() FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
UNION
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u1.user_id, random() FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
) as bar;
$$);
SELECT public.explain_has_distributed_subplan($$
explain
SELECT count(*) FROM
(
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u2.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
UNION
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT u2.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as bar ON (true)
) as bar;
$$);
SELECT public.explain_has_distributed_subplan($$
explain
SELECT bar.user_id FROM users_table_part u1 JOIN LATERAL (SELECT *,random() FROM (SELECT u1.user_id FROM users_table_part u2 WHERE u2.user_id = u1.user_id) as foo) as bar ON (true)
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN WITH cte_1 AS not materialized
(
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l1
UNION ALL
SELECT user_id
FROM (
SELECT user_id
FROM users_table_part
GROUP BY user_id) AS l2
UNION ALL
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l2
UNION ALL
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l1
UNION
SELECT user_id
FROM (
SELECT user_id
FROM (
SELECT DISTINCT user_id
FROM users_table_part) aa ) AS fooo
UNION
SELECT user_id
FROM (
SELECT DISTINCT user_id
FROM users_table_part) AS l2 ), cte_2 AS NOT materialized
(
SELECT *
FROM cte_1), cte_3 AS NOT materialized
(
SELECT *
FROM cte_2), cte_4 AS
(
SELECT DISTINCT user_id
FROM cte_3)
SELECT count(*)
FROM (
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l1
UNION ALL
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l2
UNION ALL
SELECT user_id
FROM (
SELECT *,
random()
FROM users_table_part) AS l2
UNION ALL
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l1
UNION
SELECT user_id
FROM (
SELECT user_id AS user_id
FROM users_table_part) AS l2
UNION
SELECT user_id
FROM (
SELECT *,
random()
FROM users_table_part) AS l2
UNION
SELECT user_id
FROM (
SELECT *
FROM cte_1) AS bar
UNION ALL
SELECT user_id
FROM cte_2
UNION
SELECT *
FROM (
SELECT user_id
FROM cte_2
GROUP BY user_id) AS bar
UNION
SELECT user_id
FROM cte_4
UNION
SELECT user_id
FROM cte_3 ) AS foo
JOIN lateral
(
SELECT *
FROM (
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l1
UNION ALL
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l2
UNION ALL
SELECT user_id
FROM (
SELECT *,
random()
FROM users_table_part) AS l2
UNION ALL
SELECT user_id
FROM (
SELECT *
FROM users_table_part) AS l1
UNION
SELECT user_id
FROM (
SELECT user_id AS user_id
FROM users_table_part) AS l2
UNION
SELECT user_id
FROM (
SELECT users_table_part.user_id,
random()
FROM users_table_part) AS l2
UNION
SELECT user_id
FROM (
SELECT *
FROM cte_1) AS bar
UNION ALL
SELECT user_id
FROM cte_2
UNION
SELECT *
FROM (
SELECT user_id
FROM cte_2
GROUP BY user_id) AS bar
UNION
SELECT user_id
FROM cte_3 ) AS subqu) AS bar
using (user_id)
WHERE (
foo.user_id) IN
(
SELECT user_id
FROM (
SELECT foo.user_id
FROM users_table_part u1
JOIN lateral
(
SELECT u1.user_id
FROM users_table_part u2
WHERE u2.user_id = u1.user_id) AS foo
ON (
true)
UNION
SELECT foo.user_id
FROM users_table_part u1
JOIN lateral
(
SELECT u1.user_id
FROM users_table_part u2
WHERE u2.user_id = u1.user_id) AS foo
ON (
true) ) AS bar
UNION
SELECT *
FROM cte_2
WHERE user_id IN
(
SELECT user_id
FROM users_table_part)
UNION
SELECT *
FROM cte_1 ) ;
$$);
-- we hit https://github.com/citusdata/citus/blob/f00c63c33daf3d16f06462626ca14732b141ae7a/src/backend/distributed/planner/relation_restriction_equivalence.c#L235-L242
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT * FROM users_table_part u1 WHERE (value_1, user_id) IN
(
SELECT u1.user_id, user_id FROM users_table_part
UNION
SELECT u1.user_id, user_id FROM users_table_part
);
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT * FROM users_table_part u1 WHERE (user_id) IN
(
SELECT foo.user_id FROM users_table_part JOIN users_table_part foo ON users_table_part.user_id = foo.user_id
UNION
SELECT foo.user_id FROM users_table_part JOIN users_table_part foo on users_table_part.user_id = foo.user_id
);
$$);
CREATE OR REPLACE VIEW v AS SELECT test_a.* from test_a where k>1 UNION ALL SELECT test_b.* from test_b where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT COUNT(*) FROM v;
$$);
CREATE OR REPLACE VIEW v AS (SELECT * from (SELECT * from test_a)a1 where k>1) UNION ALL SELECT * from (SELECT * from test_b)b1 where k<1;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT COUNT(*) FROM v;
$$);
-- order by prevents postgres from optimizing fields so can be pushed down
SELECT public.explain_has_distributed_subplan($$
EXPLAIN