diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 0040a4185..84cd3b7a9 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -2203,6 +2203,33 @@ CreateAndPushPlannerRestrictionContext(void) } +/* + * TranslatedVarsForRteIdentity gets an rteIdentity and returns the + * translatedVars that belong to the range table relation. If no + * translatedVars found, the function returns NIL; + */ +List * +TranslatedVarsForRteIdentity(int rteIdentity) +{ + PlannerRestrictionContext *currentPlannerRestrictionContext = + CurrentPlannerRestrictionContext(); + + List *relationRestrictionList = + currentPlannerRestrictionContext->relationRestrictionContext-> + relationRestrictionList; + RelationRestriction *relationRestriction = NULL; + foreach_ptr(relationRestriction, relationRestrictionList) + { + if (GetRTEIdentity(relationRestriction->rte) == rteIdentity) + { + return relationRestriction->translatedVars; + } + } + + return NIL; +} + + /* * CurrentRestrictionContext returns the most recently added * PlannerRestrictionContext from the plannerRestrictionContextList list. diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 5c7556eaa..e7f1556be 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -61,6 +61,8 @@ typedef struct AttributeEquivalenceClass { uint32 equivalenceId; List *equivalentAttributes; + + Index unionQueryPartitionKeyIndex; } AttributeEquivalenceClass; /* @@ -163,6 +165,7 @@ static Relids QueryRteIdentities(Query *queryTree); static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo); #endif + /* * AllDistributionKeysInQueryAreEqual returns true if either * (i) there exists join in the query and all relations joined on their @@ -253,7 +256,7 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext plannerRestrictionContext->relationRestrictionContext; JoinRestrictionContext *joinRestrictionContext = plannerRestrictionContext->joinRestrictionContext; - Index unionQueryPartitionKeyIndex = 0; + AttributeEquivalenceClass *attributeEquivalence = palloc0(sizeof(AttributeEquivalenceClass)); ListCell *relationRestrictionCell = NULL; @@ -328,11 +331,11 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext * we check whether all the relations have partition keys in the * same position. */ - if (unionQueryPartitionKeyIndex == InvalidAttrNumber) + if ((attributeEquivalence)->unionQueryPartitionKeyIndex == InvalidAttrNumber) { - unionQueryPartitionKeyIndex = partitionKeyIndex; + (attributeEquivalence)->unionQueryPartitionKeyIndex = partitionKeyIndex; } - else if (unionQueryPartitionKeyIndex != partitionKeyIndex) + else if ((attributeEquivalence)->unionQueryPartitionKeyIndex != partitionKeyIndex) { continue; } @@ -431,6 +434,13 @@ static Var * FindUnionAllVar(PlannerInfo *root, List *translatedVars, Oid relationOid, Index relationRteIndex, Index *partitionKeyIndex) { + if (!IsCitusTableType(relationOid, STRICTLY_PARTITIONED_DISTRIBUTED_TABLE)) + { + /* we only care about hash and range partitioned tables */ + *partitionKeyIndex = 0; + return NULL; + } + Var *relationPartitionKey = DistPartitionKeyOrError(relationOid); AttrNumber childAttrNumber = 0; @@ -439,7 +449,6 @@ FindUnionAllVar(PlannerInfo *root, List *translatedVars, Oid relationOid, foreach(translatedVarCell, translatedVars) { Node *targetNode = (Node *) lfirst(translatedVarCell); - childAttrNumber++; if (!IsA(targetNode, Var)) @@ -586,7 +595,6 @@ GenerateAllAttributeEquivalences(PlannerRestrictionContext *plannerRestrictionCo JoinRestrictionContext *joinRestrictionContext = plannerRestrictionContext->joinRestrictionContext; - /* reset the equivalence id counter per call to prevent overflows */ attributeEquivalenceId = 1; @@ -1241,7 +1249,8 @@ static void AddRteSubqueryToAttributeEquivalenceClass(AttributeEquivalenceClass **attributeEquivalenceClass, RangeTblEntry *rangeTableEntry, - PlannerInfo *root, Var *varToBeAdded) + PlannerInfo *root, + Var *varToBeAdded) { RelOptInfo *baseRelOptInfo = find_base_rel(root, varToBeAdded->varno); Query *targetSubquery = GetTargetSubquery(root, rangeTableEntry, varToBeAdded); @@ -1383,12 +1392,71 @@ AddUnionAllSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass ** continue; } int rtoffset = RangeTableOffsetCompat(root, appendRelInfo); + int childRelId = appendRelInfo->child_relid - rtoffset; - /* set the varno accordingly for this specific child */ - varToBeAdded->varno = appendRelInfo->child_relid - rtoffset; + if (root->simple_rel_array_size <= childRelId) + { + /* we prefer to return over an Assert or error to be defensive */ + return; + } - AddToAttributeEquivalenceClass(attributeEquivalenceClass, root, - varToBeAdded); + RangeTblEntry *rte = root->simple_rte_array[childRelId]; + if (rte->inh) + { + /* + * This code-path may require improvements. If a leaf of a UNION ALL + * (e.g., an entry in appendRelList) itself is another UNION ALL + * (e.g., rte->inh = true), the logic here might get into an infinite + * recursion. + * + * The downside of "continue" here is that certain UNION ALL queries + * that are safe to pushdown may not be pushed down. + */ + continue; + } + else if (rte->rtekind == RTE_RELATION) + { + Index partitionKeyIndex = 0; + List *translatedVars = TranslatedVarsForRteIdentity(GetRTEIdentity(rte)); + Var *varToBeAddedOnUnionAllSubquery = + FindUnionAllVar(root, translatedVars, rte->relid, childRelId, + &partitionKeyIndex); + if (partitionKeyIndex == 0) + { + /* no partition key on the target list */ + continue; + } + + if ((*attributeEquivalenceClass)->unionQueryPartitionKeyIndex == 0) + { + /* the first partition key index we found */ + (*attributeEquivalenceClass)->unionQueryPartitionKeyIndex = + partitionKeyIndex; + } + else if ((*attributeEquivalenceClass)->unionQueryPartitionKeyIndex != + partitionKeyIndex) + { + /* + * Partition keys on the leaves of the UNION ALL queries on + * different ordinal positions. We cannot pushdown, so skip. + */ + continue; + } + + if (varToBeAddedOnUnionAllSubquery != NULL) + { + AddToAttributeEquivalenceClass(attributeEquivalenceClass, root, + varToBeAddedOnUnionAllSubquery); + } + } + else + { + /* set the varno accordingly for this specific child */ + varToBeAdded->varno = childRelId; + + AddToAttributeEquivalenceClass(attributeEquivalenceClass, root, + varToBeAdded); + } } } diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 34b9d5b0f..bc289ef0c 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -222,9 +222,9 @@ extern PlannedStmt * distributed_planner(Query *parse, #define LOCAL_TABLE_SUBQUERY_CTE_HINT \ "Use CTE's or subqueries to select from local tables and use them in joins" - extern List * ExtractRangeTableEntryList(Query *query); extern bool NeedsDistributedPlanning(Query *query); +extern List * TranslatedVarsForRteIdentity(int rteIdentity); extern struct DistributedPlan * GetDistributedPlan(CustomScan *node); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index restrictionIndex, RangeTblEntry *rte); diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index fef5c7f40..9eba70f22 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -43,6 +43,20 @@ BEGIN END LOOP; RETURN false; END; $$ language plpgsql; +-- helper function that returns true if output of given explain has "is not null" (case in-sensitive) +CREATE OR REPLACE FUNCTION explain_has_distributed_subplan(explain_commmand text) +RETURNS BOOLEAN AS $$ +DECLARE + query_plan text; +BEGIN + FOR query_plan IN EXECUTE explain_commmand LOOP + IF query_plan ILIKE '%Distributed Subplan %_%' + THEN + RETURN true; + END IF; + END LOOP; + RETURN false; +END; $$ language plpgsql; -- helper function to quickly run SQL on the whole cluster CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text) RETURNS void LANGUAGE plpgsql AS $$ diff --git a/src/test/regress/expected/set_operations.out b/src/test/regress/expected/set_operations.out index 472e5d05c..47cf831bc 100644 --- a/src/test/regress/expected/set_operations.out +++ b/src/test/regress/expected/set_operations.out @@ -317,6 +317,14 @@ DEBUG: Router planner cannot handle multi-shard select queries SELECT * FROM ((SELECT x, y FROM test) UNION ALL (SELECT y, x FROM test)) u ORDER BY 1,2; DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM recursive_union.test +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan XXX_2 for subquery SELECT y, x FROM recursive_union.test +DEBUG: Creating router plan +DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) UNION ALL SELECT intermediate_result.y, intermediate_result.x FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(y integer, x integer) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x, y FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) u ORDER BY x, y +DEBUG: Creating router plan x | y --------------------------------------------------------------------- 1 | 1 diff --git a/src/test/regress/expected/union_pushdown.out b/src/test/regress/expected/union_pushdown.out index b63aa93ee..2691ff461 100644 --- a/src/test/regress/expected/union_pushdown.out +++ b/src/test/regress/expected/union_pushdown.out @@ -36,6 +36,16 @@ SELECT create_distributed_table('events_table_part', 'user_id'); (1 row) INSERT INTO events_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i; +CREATE TABLE events_table_ref(user_id bigint, value_1 int, value_2 int); +SELECT create_reference_table('events_table_ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO events_table_ref SELECT i, i %9, i %50 FROM generate_series(0, 100) i; +CREATE TABLE events_table_local(user_id bigint, value_1 int, value_2 int); +INSERT INTO events_table_local SELECT i, i %9, i %50 FROM generate_series(0, 100) i; set client_min_messages to DEBUG1; -- a union all query with 2 different levels of UNION ALL SELECT COUNT(*) @@ -286,8 +296,573 @@ DEBUG: push down of limit count: 1 0 (1 row) +-- safe to pushdown +SELECT * FROM ( + (SELECT user_id FROM users_table_part UNION ALL SELECT * FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar1) as foo + JOIN + (SELECT user_id FROM users_table_part UNION ALL SELECT * FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar2) as bar +USING (user_id) +) +ORDER BY 1 LIMIT 1; +DEBUG: push down of limit count: 1 + user_id +--------------------------------------------------------------------- + 0 +(1 row) + +-- UNION ALL leaf queries deep in the subquery +SELECT * FROM +( + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level ORDER BY 1 DESC LIMIT 3; +DEBUG: push down of limit count: 3 + user_id | value_1 | value_2 +--------------------------------------------------------------------- + 100 | 1 | 0 + 100 | 1 | 0 + 99 | 0 | 49 +(3 rows) + +-- UNION ALL leaf queries deep in the subquery +-- and random() calls prevent any pullup +SELECT user_id FROM +( + SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level ORDER BY 1 DESC LIMIT 3; +DEBUG: push down of limit count: 3 + user_id +--------------------------------------------------------------------- + 100 + 100 + 99 +(3 rows) + +-- UNION ALL leaf queries deep in the subquery +-- joined with a table +SELECT * FROM +( + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM users_table_part UNION ALL SELECT * FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM users_table_part UNION ALL SELECT * FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level +JOIN +events_table_part USING(user_id) + ORDER BY 1 DESC LIMIT 3; +DEBUG: push down of limit count: 3 + user_id | value_1 | value_2 | value_1 | value_2 +--------------------------------------------------------------------- + 100 | 1 | 0 | 1 | 0 + 100 | 1 | 0 | 1 | 0 + 100 | 1 | 0 | 1 | 0 +(3 rows) + +-- UNION ALL leaf queries deep in the subquery +-- and random() calls prevent any pullup +SELECT user_id FROM +( + SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM users_table_part UNION ALL SELECT *,1 FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM users_table_part UNION ALL SELECT *,2 FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level +JOIN +events_table_part USING(user_id) + ORDER BY 1 DESC LIMIT 3; +DEBUG: push down of limit count: 3 + user_id +--------------------------------------------------------------------- + 100 + 100 + 100 +(3 rows) + +-- a tree with [Q1.1 JOIN Q1.2 UNION ALL Q2.1 JOIN Q2.2] JOIN [Q3.1 JOIN Q3.2 UNION ALL Q4.1 JOIN Q4.2] +-- can be pushed down +SELECT * FROM ( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_left + JOIN +( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_righy USING (user_id) +ORDER BY user_id DESC +LIMIT 1; +DEBUG: push down of limit count: 1 + user_id | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 +--------------------------------------------------------------------- + 100 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 +(1 row) + +-- a tree with [Q1.1 JOIN Q1.2 UNION ALL Q2.1 JOIN Q2.2] JOIN [Q3.1 JOIN Q3.2 UNION ALL Q4.1 JOIN Q4.2] +-- can be pushed down with reference tables +SELECT * FROM ( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_left + JOIN +( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_righy USING (user_id) +ORDER BY user_id DESC +LIMIT 1; +DEBUG: push down of limit count: 1 + user_id | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 +--------------------------------------------------------------------- + 100 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 +(1 row) + +-- a tree with [Q1.1 JOIN Q1.2 UNION ALL Q2.1 JOIN Q2.2] JOIN [Q3.1 JOIN Q3.2 UNION ALL Q4.1 JOIN Q4.2] +-- can be pushed down with local tables after local tables have been recursively planned +SELECT * FROM ( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_left + JOIN +( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_righy USING (user_id) +ORDER BY user_id DESC +LIMIT 1; +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_1 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_2 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_3 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_4 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_5 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_6 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_7 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_8 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_9 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_10 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_11 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_12 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_13 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_14 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_15 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Wrapping relation "events_table_local" to a subquery +DEBUG: generating subplan XXX_16 for subquery SELECT user_id, value_1, value_2 FROM union_pushdown.events_table_local WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT top_level_left.user_id, top_level_left.value_1, top_level_left.value_2, top_level_left.value_1_1 AS value_1, top_level_left.value_2_1 AS value_2, top_level_left.value_1_2 AS value_1, top_level_left.value_2_2 AS value_2, top_level_left.value_1_3 AS value_1, top_level_left.value_2_3 AS value_2, top_level_righy.value_1, top_level_righy.value_2, top_level_righy.value_1_1 AS value_1, top_level_righy.value_2_1 AS value_2, top_level_righy.value_1_2 AS value_1, top_level_righy.value_2_2 AS value_2, top_level_righy.value_1_3 AS value_1, top_level_righy.value_2_3 AS value_2 FROM ((SELECT left_subquery.user_id, left_subquery.value_1, left_subquery.value_2, left_subquery.value_1_1 AS value_1, left_subquery.value_2_1 AS value_2, left_subquery.value_1_2 AS value_1, left_subquery.value_2_2 AS value_2, left_subquery.value_1_1_1 AS value_1, left_subquery.value_2_1_1 AS value_2 FROM ((SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id)) UNION ALL SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id))) l1(user_id, value_1, value_2, value_1_1, value_2_1) JOIN (SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id)) UNION ALL SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id))) l2(user_id, value_1, value_2, value_1_1, value_2_1) USING (user_id)) left_subquery(user_id, value_1, value_2, value_1_1, value_2_1, value_1_2, value_2_2, value_1_1_1, value_2_1_1) UNION ALL SELECT right_subquery.user_id, right_subquery.value_1, right_subquery.value_2, right_subquery.value_1_1 AS value_1, right_subquery.value_2_1 AS value_2, right_subquery.value_1_2 AS value_1, right_subquery.value_2_2 AS value_2, right_subquery.value_1_1_1 AS value_1, right_subquery.value_2_1_1 AS value_2 FROM ((SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id)) UNION ALL SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_6'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id))) l1(user_id, value_1, value_2, value_1_1, value_2_1) JOIN (SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_7'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id)) UNION ALL SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_8'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id))) l2(user_id, value_1, value_2, value_1_1, value_2_1) USING (user_id)) right_subquery(user_id, value_1, value_2, value_1_1, value_2_1, value_1_2, value_2_2, value_1_1_1, value_2_1_1)) top_level_left(user_id, value_1, value_2, value_1_1, value_2_1, value_1_2, value_2_2, value_1_3, value_2_3) JOIN (SELECT left_subquery.user_id, left_subquery.value_1, left_subquery.value_2, left_subquery.value_1_1 AS value_1, left_subquery.value_2_1 AS value_2, left_subquery.value_1_2 AS value_1, left_subquery.value_2_2 AS value_2, left_subquery.value_1_1_1 AS value_1, left_subquery.value_2_1_1 AS value_2 FROM ((SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_9'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id)) UNION ALL SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_10'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id))) l1(user_id, value_1, value_2, value_1_1, value_2_1) JOIN (SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_11'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id)) UNION ALL SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_12'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id))) l2(user_id, value_1, value_2, value_1_1, value_2_1) USING (user_id)) left_subquery(user_id, value_1, value_2, value_1_1, value_2_1, value_1_2, value_2_2, value_1_1_1, value_2_1_1) UNION ALL SELECT right_subquery.user_id, right_subquery.value_1, right_subquery.value_2, right_subquery.value_1_1 AS value_1, right_subquery.value_2_1 AS value_2, right_subquery.value_1_2 AS value_1, right_subquery.value_2_2 AS value_2, right_subquery.value_1_1_1 AS value_1, right_subquery.value_2_1_1 AS value_2 FROM ((SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_13'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id)) UNION ALL SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_14'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id))) l1(user_id, value_1, value_2, value_1_1, value_2_1) JOIN (SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_15'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id)) UNION ALL SELECT users_table_part.user_id, users_table_part.value_1, users_table_part.value_2, events_table_local.value_1, events_table_local.value_2 FROM (union_pushdown.users_table_part JOIN (SELECT events_table_local_1.user_id, events_table_local_1.value_1, events_table_local_1.value_2 FROM (SELECT intermediate_result.user_id, intermediate_result.value_1, intermediate_result.value_2 FROM read_intermediate_result('XXX_16'::text, 'binary'::citus_copy_format) intermediate_result(user_id bigint, value_1 integer, value_2 integer)) events_table_local_1) events_table_local USING (user_id))) l2(user_id, value_1, value_2, value_1_1, value_2_1) USING (user_id)) right_subquery(user_id, value_1, value_2, value_1_1, value_2_1, value_1_2, value_2_2, value_1_1_1, value_2_1_1)) top_level_righy(user_id, value_1, value_2, value_1_1, value_2_1, value_1_2, value_2_2, value_1_3, value_2_3) USING (user_id)) ORDER BY top_level_left.user_id DESC LIMIT 1 +DEBUG: push down of limit count: 1 + user_id | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 +--------------------------------------------------------------------- + 100 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0 +(1 row) + +-- a subquery in WHERE clause with +-- a tree with [Q1.1 JOIN Q1.2 UNION ALL Q2.1 JOIN Q2.2] JOIN [Q3.1 JOIN Q3.2 UNION ALL Q4.1 JOIN Q4.2] +-- can be pushed down with FROM tree consisting of JOINs/UNION ALLs +SELECT * FROM +users_table_part u1 + JOIN +events_table_part e1 USING (user_id) + JOIN +users_table_part u2 USING (user_id) + JOIN +(SELECT * FROM users_table_part UNION ALL SELECT * FROM events_table_part) as foo USING (user_id) +WHERE user_id IN + (SELECT user_id FROM ( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_left + JOIN +( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_righy USING (user_id) +ORDER BY user_id DESC +) ORDER BY 1 LIMIT 1; +DEBUG: push down of limit count: 1 + user_id | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 | value_1 | value_2 +--------------------------------------------------------------------- + 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 +(1 row) + +--------------------------------------------------------------------- +------------ The following tests ensure that we do not accidentally pushdown +------------ queries involving UNION ALL queries if the distribution keys do +------------ not match or any JOIN is not on the distribution key +------------ We used the queries that are defined above +--------------------------------------------------------------------- +RESET client_min_messages; +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT * FROM ((SELECT 1 FROM events_table_part) UNION ALL (SELECT 1 FROM events_table_part)) u;$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT * FROM ((SELECT random() FROM events_table_part) UNION ALL (SELECT user_id FROM events_table_part)) u;$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT * FROM ((SELECT user_id FROM events_table_part) UNION ALL (SELECT user_id - 1 FROM events_table_part)) u;$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT * FROM ((SELECT user_id FROM events_table_part) UNION ALL (SELECT user_id - 1 as user_id FROM events_table_part)) u + JOIN users_table_part USING(user_id);$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM +( + SELECT events_table_part.value_1 FROM users_table_part JOIN events_table_part USING (user_id) + UNION ALL + SELECT events_table_part.value_1 FROM users_table_part JOIN events_table_part USING (user_id) +) as bar;$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT COUNT(*) +FROM + (SELECT user_id AS user_id + FROM + (SELECT value_1 AS user_id + FROM users_table_part + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS bar + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS fool$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT COUNT(*) +FROM + (SELECT user_id AS user_id + FROM + (SELECT count(*) AS user_id + FROM users_table_part GROUP BY user_id + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS bar + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS fool$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM +( + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT value_1, user_id FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT user_id, value_1 FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT user_id FROM +( + SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT value_1 as user_id,random() FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT user_id,random() FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM + ( + (((SELECT users_table_part.value_1 as user_id FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT users_table_part.user_id as user_id FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT users_table_part.value_1 as user_id FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT users_table_part.user_id as user_id FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT users_table_part.value_1 as user_id FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT users_table_part.user_id as user_id FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT users_table_part.value_1 as user_id FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT users_table_part.user_id as user_id FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM +( + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT user_id, value_1 FROM users_table_part UNION ALL SELECT user_id, value_1 FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT value_1, user_id FROM users_table_part UNION ALL SELECT value_1, user_id FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level +JOIN +events_table_part USING(user_id) + ORDER BY 1 DESC LIMIT 3; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + +-- we can pushdown UNION ALL queries that are correlated and exists +-- on the SELECT clause +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN + SELECT + (SELECT count(*) FROM users_table_part WHERE user_id = e.user_id + UNION ALL + SELECT count(*) FROM users_table_part WHERE user_id = e.user_id) +FROM + (SELECT * FROM users_table_part UNION ALL SELECT * FROM users_table_part) as e; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +-- even if the UNION ALL is not on the distribution key +-- it is safe to pushdown the query because all tables are joined +-- on the distribution keys +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN + SELECT + (SELECT user_id FROM users_table_part WHERE user_id = e.user_id + UNION ALL + SELECT value_1 FROM users_table_part WHERE user_id = e.user_id) +FROM + (SELECT * FROM users_table_part UNION ALL SELECT * FROM users_table_part) as e; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +-- but if the join is not on the distribution key +-- Citus throws an error + EXPLAIN + SELECT + (SELECT user_id FROM users_table_part WHERE user_id = e.value_1 + UNION ALL + SELECT user_id FROM users_table_part WHERE user_id = e.value_1) + FROM + (SELECT * FROM users_table_part) as e; +ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns +-- correlated subquery should be able to pushdown +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT * FROM +users_table_part e JOIN LATERAL +(SELECT value_1 FROM users_table_part WHERE user_id = e.user_id + UNION ALL + SELECT value_1 FROM users_table_part WHERE user_id = e.user_id) as foo ON (true); +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +-- correlated subquery should be able to pushdown +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT + (SELECT + avg(count) + FROM + (SELECT count(*) as count from users_table_part where users_table_part.user_id = u_low.user_id + UNION ALL + SELECT count(*) from users_table_part where users_table_part.user_id = u_low.user_id) b) + FROM users_table_part u_low; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + f +(1 row) + +-- we cannot pushdown if one side of the UNION ALL +-- is a reference table +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT * +FROM + (SELECT * + FROM events_table_ref + UNION ALL SELECT events_table_ref.* + FROM events_table_part + JOIN events_table_ref USING(user_id)) AS foo +JOIN users_table_part USING(user_id) +LIMIT 1; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + +-- we cannot pushdown if one side of the UNION ALL +-- is a local table +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT * +FROM + (SELECT * + FROM events_table_local + UNION ALL SELECT events_table_local.* + FROM events_table_part + JOIN events_table_local USING(user_id)) AS foo +JOIN users_table_part USING(user_id) +LIMIT 1; +$$); + explain_has_distributed_subplan +--------------------------------------------------------------------- + t +(1 row) + RESET client_min_messages; DROP SCHEMA union_pushdown CASCADE; -NOTICE: drop cascades to 2 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table users_table_part drop cascades to table events_table_part +drop cascades to table events_table_ref +drop cascades to table events_table_local diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index 7c37a940e..ad1c0e46f 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -48,6 +48,21 @@ BEGIN RETURN false; END; $$ language plpgsql; +-- helper function that returns true if output of given explain has "is not null" (case in-sensitive) +CREATE OR REPLACE FUNCTION explain_has_distributed_subplan(explain_commmand text) +RETURNS BOOLEAN AS $$ +DECLARE + query_plan text; +BEGIN + FOR query_plan IN EXECUTE explain_commmand LOOP + IF query_plan ILIKE '%Distributed Subplan %_%' + THEN + RETURN true; + END IF; + END LOOP; + RETURN false; +END; $$ language plpgsql; + -- helper function to quickly run SQL on the whole cluster CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text) RETURNS void LANGUAGE plpgsql AS $$ diff --git a/src/test/regress/sql/union_pushdown.sql b/src/test/regress/sql/union_pushdown.sql index c020e2cc4..11655a701 100644 --- a/src/test/regress/sql/union_pushdown.sql +++ b/src/test/regress/sql/union_pushdown.sql @@ -30,6 +30,12 @@ CREATE TABLE events_table_part_8 PARTITION OF events_table_part FOR VALUES FROM SELECT create_distributed_table('events_table_part', 'user_id'); INSERT INTO events_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i; +CREATE TABLE events_table_ref(user_id bigint, value_1 int, value_2 int); +SELECT create_reference_table('events_table_ref'); +INSERT INTO events_table_ref SELECT i, i %9, i %50 FROM generate_series(0, 100) i; + +CREATE TABLE events_table_local(user_id bigint, value_1 int, value_2 int); +INSERT INTO events_table_local SELECT i, i %9, i %50 FROM generate_series(0, 100) i; set client_min_messages to DEBUG1; -- a union all query with 2 different levels of UNION ALL @@ -230,5 +236,424 @@ SELECT DISTINCT user_id FROM USING (user_id) ORDER BY 1 LIMIT 1; +-- safe to pushdown +SELECT * FROM ( + (SELECT user_id FROM users_table_part UNION ALL SELECT * FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar1) as foo + JOIN + (SELECT user_id FROM users_table_part UNION ALL SELECT * FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar2) as bar +USING (user_id) +) +ORDER BY 1 LIMIT 1; + +-- UNION ALL leaf queries deep in the subquery +SELECT * FROM +( + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level ORDER BY 1 DESC LIMIT 3; + +-- UNION ALL leaf queries deep in the subquery +-- and random() calls prevent any pullup +SELECT user_id FROM +( + SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level ORDER BY 1 DESC LIMIT 3; + + +-- UNION ALL leaf queries deep in the subquery +-- joined with a table +SELECT * FROM +( + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM users_table_part UNION ALL SELECT * FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM users_table_part UNION ALL SELECT * FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level +JOIN +events_table_part USING(user_id) + ORDER BY 1 DESC LIMIT 3; + +-- UNION ALL leaf queries deep in the subquery +-- and random() calls prevent any pullup +SELECT user_id FROM +( + SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM users_table_part UNION ALL SELECT *,1 FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM users_table_part UNION ALL SELECT *,2 FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level +JOIN +events_table_part USING(user_id) + ORDER BY 1 DESC LIMIT 3; + +-- a tree with [Q1.1 JOIN Q1.2 UNION ALL Q2.1 JOIN Q2.2] JOIN [Q3.1 JOIN Q3.2 UNION ALL Q4.1 JOIN Q4.2] +-- can be pushed down +SELECT * FROM ( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + + UNION ALL + + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_left + JOIN +( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_righy USING (user_id) +ORDER BY user_id DESC +LIMIT 1; + +-- a tree with [Q1.1 JOIN Q1.2 UNION ALL Q2.1 JOIN Q2.2] JOIN [Q3.1 JOIN Q3.2 UNION ALL Q4.1 JOIN Q4.2] +-- can be pushed down with reference tables +SELECT * FROM ( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + + UNION ALL + + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_left + JOIN +( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_righy USING (user_id) +ORDER BY user_id DESC +LIMIT 1; + +-- a tree with [Q1.1 JOIN Q1.2 UNION ALL Q2.1 JOIN Q2.2] JOIN [Q3.1 JOIN Q3.2 UNION ALL Q4.1 JOIN Q4.2] +-- can be pushed down with local tables after local tables have been recursively planned +SELECT * FROM ( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + + UNION ALL + + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_left + JOIN +( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_righy USING (user_id) +ORDER BY user_id DESC +LIMIT 1; + +-- a subquery in WHERE clause with +-- a tree with [Q1.1 JOIN Q1.2 UNION ALL Q2.1 JOIN Q2.2] JOIN [Q3.1 JOIN Q3.2 UNION ALL Q4.1 JOIN Q4.2] +-- can be pushed down with FROM tree consisting of JOINs/UNION ALLs +SELECT * FROM +users_table_part u1 + JOIN +events_table_part e1 USING (user_id) + JOIN +users_table_part u2 USING (user_id) + JOIN +(SELECT * FROM users_table_part UNION ALL SELECT * FROM events_table_part) as foo USING (user_id) +WHERE user_id IN + (SELECT user_id FROM ( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_left + JOIN +( + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +) as top_level_righy USING (user_id) +ORDER BY user_id DESC +) ORDER BY 1 LIMIT 1; + +--------------------------------------------------------------------------- +------------ The following tests ensure that we do not accidentally pushdown +------------ queries involving UNION ALL queries if the distribution keys do +------------ not match or any JOIN is not on the distribution key +------------ We used the queries that are defined above +--------------------------------------------------------------------------- +RESET client_min_messages; +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT * FROM ((SELECT 1 FROM events_table_part) UNION ALL (SELECT 1 FROM events_table_part)) u;$$); + +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT * FROM ((SELECT random() FROM events_table_part) UNION ALL (SELECT user_id FROM events_table_part)) u;$$); + +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT * FROM ((SELECT user_id FROM events_table_part) UNION ALL (SELECT user_id - 1 FROM events_table_part)) u;$$); + +SELECT public.explain_has_distributed_subplan($$ + EXPLAIN SELECT * FROM ((SELECT user_id FROM events_table_part) UNION ALL (SELECT user_id - 1 as user_id FROM events_table_part)) u + JOIN users_table_part USING(user_id);$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM +( + SELECT events_table_part.value_1 FROM users_table_part JOIN events_table_part USING (user_id) + UNION ALL + SELECT events_table_part.value_1 FROM users_table_part JOIN events_table_part USING (user_id) +) as bar;$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT COUNT(*) +FROM + (SELECT user_id AS user_id + FROM + (SELECT value_1 AS user_id + FROM users_table_part + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS bar + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS fool$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT COUNT(*) +FROM + (SELECT user_id AS user_id + FROM + (SELECT count(*) AS user_id + FROM users_table_part GROUP BY user_id + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS bar + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS fool$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM +( + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT value_1, user_id FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT user_id, value_1 FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT user_id FROM +( + SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT value_1 as user_id,random() FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT user_id,random() FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM + ( + (((SELECT users_table_part.value_1 as user_id FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT users_table_part.user_id as user_id FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT users_table_part.value_1 as user_id FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT users_table_part.user_id as user_id FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as left_subquery + UNION ALL + SELECT * FROM + ( + (((SELECT users_table_part.value_1 as user_id FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT users_table_part.user_id as user_id FROM users_table_part JOIN events_table_part USING (user_id))) as l1 + JOIN + ((SELECT users_table_part.value_1 as user_id FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT users_table_part.user_id as user_id FROM users_table_part JOIN events_table_part USING (user_id))) as l2 + USING(user_id)) + ) as right_subquery +$$); + +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN SELECT * FROM +( + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT user_id, value_1 FROM users_table_part UNION ALL SELECT user_id, value_1 FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 + UNION ALL + SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT value_1, user_id FROM users_table_part UNION ALL SELECT value_1, user_id FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1 +) as top_level +JOIN +events_table_part USING(user_id) + ORDER BY 1 DESC LIMIT 3; +$$); + +-- we can pushdown UNION ALL queries that are correlated and exists +-- on the SELECT clause +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN + SELECT + (SELECT count(*) FROM users_table_part WHERE user_id = e.user_id + UNION ALL + SELECT count(*) FROM users_table_part WHERE user_id = e.user_id) +FROM + (SELECT * FROM users_table_part UNION ALL SELECT * FROM users_table_part) as e; +$$); + +-- even if the UNION ALL is not on the distribution key +-- it is safe to pushdown the query because all tables are joined +-- on the distribution keys +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN + SELECT + (SELECT user_id FROM users_table_part WHERE user_id = e.user_id + UNION ALL + SELECT value_1 FROM users_table_part WHERE user_id = e.user_id) +FROM + (SELECT * FROM users_table_part UNION ALL SELECT * FROM users_table_part) as e; +$$); + + +-- but if the join is not on the distribution key +-- Citus throws an error + EXPLAIN + SELECT + (SELECT user_id FROM users_table_part WHERE user_id = e.value_1 + UNION ALL + SELECT user_id FROM users_table_part WHERE user_id = e.value_1) + FROM + (SELECT * FROM users_table_part) as e; + +-- correlated subquery should be able to pushdown +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT * FROM +users_table_part e JOIN LATERAL +(SELECT value_1 FROM users_table_part WHERE user_id = e.user_id + UNION ALL + SELECT value_1 FROM users_table_part WHERE user_id = e.user_id) as foo ON (true); +$$); + +-- correlated subquery should be able to pushdown +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT + (SELECT + avg(count) + FROM + (SELECT count(*) as count from users_table_part where users_table_part.user_id = u_low.user_id + UNION ALL + SELECT count(*) from users_table_part where users_table_part.user_id = u_low.user_id) b) + FROM users_table_part u_low; +$$); + + +-- we cannot pushdown if one side of the UNION ALL +-- is a reference table +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT * +FROM + (SELECT * + FROM events_table_ref + UNION ALL SELECT events_table_ref.* + FROM events_table_part + JOIN events_table_ref USING(user_id)) AS foo +JOIN users_table_part USING(user_id) +LIMIT 1; +$$); + +-- we cannot pushdown if one side of the UNION ALL +-- is a local table +SELECT public.explain_has_distributed_subplan($$ +EXPLAIN +SELECT * +FROM + (SELECT * + FROM events_table_local + UNION ALL SELECT events_table_local.* + FROM events_table_part + JOIN events_table_local USING(user_id)) AS foo +JOIN users_table_part USING(user_id) +LIMIT 1; +$$); + + RESET client_min_messages; DROP SCHEMA union_pushdown CASCADE;