Prevent infinite recursion for queries that involve UNION ALL and JOIN

With this commit, we make sure to prevent infinite recursion for queries
in the format: [subquery with a UNION ALL] JOIN [table or subquery]

Also, fixes a bug where we pushdown UNION ALL below a JOIN even if the
UNION ALL is not safe to pushdown.

(cherry picked from commit d1cd198655)
pull/5009/head
Onder Kalaci 2021-02-25 11:41:55 +01:00 committed by Sait Talha Nisanci
parent 18c7a3c188
commit e9bf5fa235
8 changed files with 1145 additions and 13 deletions

View File

@ -2203,6 +2203,33 @@ CreateAndPushPlannerRestrictionContext(void)
}
/*
* TranslatedVarsForRteIdentity gets an rteIdentity and returns the
* translatedVars that belong to the range table relation. If no
* translatedVars found, the function returns NIL;
*/
List *
TranslatedVarsForRteIdentity(int rteIdentity)
{
PlannerRestrictionContext *currentPlannerRestrictionContext =
CurrentPlannerRestrictionContext();
List *relationRestrictionList =
currentPlannerRestrictionContext->relationRestrictionContext->
relationRestrictionList;
RelationRestriction *relationRestriction = NULL;
foreach_ptr(relationRestriction, relationRestrictionList)
{
if (GetRTEIdentity(relationRestriction->rte) == rteIdentity)
{
return relationRestriction->translatedVars;
}
}
return NIL;
}
/*
* CurrentRestrictionContext returns the most recently added
* PlannerRestrictionContext from the plannerRestrictionContextList list.

View File

@ -61,6 +61,8 @@ typedef struct AttributeEquivalenceClass
{
uint32 equivalenceId;
List *equivalentAttributes;
Index unionQueryPartitionKeyIndex;
} AttributeEquivalenceClass;
/*
@ -163,6 +165,7 @@ static Relids QueryRteIdentities(Query *queryTree);
static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo);
#endif
/*
* AllDistributionKeysInQueryAreEqual returns true if either
* (i) there exists join in the query and all relations joined on their
@ -253,7 +256,7 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
plannerRestrictionContext->relationRestrictionContext;
JoinRestrictionContext *joinRestrictionContext =
plannerRestrictionContext->joinRestrictionContext;
Index unionQueryPartitionKeyIndex = 0;
AttributeEquivalenceClass *attributeEquivalence =
palloc0(sizeof(AttributeEquivalenceClass));
ListCell *relationRestrictionCell = NULL;
@ -328,11 +331,11 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
* we check whether all the relations have partition keys in the
* same position.
*/
if (unionQueryPartitionKeyIndex == InvalidAttrNumber)
if ((attributeEquivalence)->unionQueryPartitionKeyIndex == InvalidAttrNumber)
{
unionQueryPartitionKeyIndex = partitionKeyIndex;
(attributeEquivalence)->unionQueryPartitionKeyIndex = partitionKeyIndex;
}
else if (unionQueryPartitionKeyIndex != partitionKeyIndex)
else if ((attributeEquivalence)->unionQueryPartitionKeyIndex != partitionKeyIndex)
{
continue;
}
@ -431,6 +434,13 @@ static Var *
FindUnionAllVar(PlannerInfo *root, List *translatedVars, Oid relationOid,
Index relationRteIndex, Index *partitionKeyIndex)
{
if (!IsCitusTableType(relationOid, STRICTLY_PARTITIONED_DISTRIBUTED_TABLE))
{
/* we only care about hash and range partitioned tables */
*partitionKeyIndex = 0;
return NULL;
}
Var *relationPartitionKey = DistPartitionKeyOrError(relationOid);
AttrNumber childAttrNumber = 0;
@ -439,7 +449,6 @@ FindUnionAllVar(PlannerInfo *root, List *translatedVars, Oid relationOid,
foreach(translatedVarCell, translatedVars)
{
Node *targetNode = (Node *) lfirst(translatedVarCell);
childAttrNumber++;
if (!IsA(targetNode, Var))
@ -586,7 +595,6 @@ GenerateAllAttributeEquivalences(PlannerRestrictionContext *plannerRestrictionCo
JoinRestrictionContext *joinRestrictionContext =
plannerRestrictionContext->joinRestrictionContext;
/* reset the equivalence id counter per call to prevent overflows */
attributeEquivalenceId = 1;
@ -1241,7 +1249,8 @@ static void
AddRteSubqueryToAttributeEquivalenceClass(AttributeEquivalenceClass
**attributeEquivalenceClass,
RangeTblEntry *rangeTableEntry,
PlannerInfo *root, Var *varToBeAdded)
PlannerInfo *root,
Var *varToBeAdded)
{
RelOptInfo *baseRelOptInfo = find_base_rel(root, varToBeAdded->varno);
Query *targetSubquery = GetTargetSubquery(root, rangeTableEntry, varToBeAdded);
@ -1383,12 +1392,71 @@ AddUnionAllSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass **
continue;
}
int rtoffset = RangeTableOffsetCompat(root, appendRelInfo);
int childRelId = appendRelInfo->child_relid - rtoffset;
/* set the varno accordingly for this specific child */
varToBeAdded->varno = appendRelInfo->child_relid - rtoffset;
if (root->simple_rel_array_size <= childRelId)
{
/* we prefer to return over an Assert or error to be defensive */
return;
}
AddToAttributeEquivalenceClass(attributeEquivalenceClass, root,
varToBeAdded);
RangeTblEntry *rte = root->simple_rte_array[childRelId];
if (rte->inh)
{
/*
* This code-path may require improvements. If a leaf of a UNION ALL
* (e.g., an entry in appendRelList) itself is another UNION ALL
* (e.g., rte->inh = true), the logic here might get into an infinite
* recursion.
*
* The downside of "continue" here is that certain UNION ALL queries
* that are safe to pushdown may not be pushed down.
*/
continue;
}
else if (rte->rtekind == RTE_RELATION)
{
Index partitionKeyIndex = 0;
List *translatedVars = TranslatedVarsForRteIdentity(GetRTEIdentity(rte));
Var *varToBeAddedOnUnionAllSubquery =
FindUnionAllVar(root, translatedVars, rte->relid, childRelId,
&partitionKeyIndex);
if (partitionKeyIndex == 0)
{
/* no partition key on the target list */
continue;
}
if ((*attributeEquivalenceClass)->unionQueryPartitionKeyIndex == 0)
{
/* the first partition key index we found */
(*attributeEquivalenceClass)->unionQueryPartitionKeyIndex =
partitionKeyIndex;
}
else if ((*attributeEquivalenceClass)->unionQueryPartitionKeyIndex !=
partitionKeyIndex)
{
/*
* Partition keys on the leaves of the UNION ALL queries on
* different ordinal positions. We cannot pushdown, so skip.
*/
continue;
}
if (varToBeAddedOnUnionAllSubquery != NULL)
{
AddToAttributeEquivalenceClass(attributeEquivalenceClass, root,
varToBeAddedOnUnionAllSubquery);
}
}
else
{
/* set the varno accordingly for this specific child */
varToBeAdded->varno = childRelId;
AddToAttributeEquivalenceClass(attributeEquivalenceClass, root,
varToBeAdded);
}
}
}

View File

@ -222,9 +222,9 @@ extern PlannedStmt * distributed_planner(Query *parse,
#define LOCAL_TABLE_SUBQUERY_CTE_HINT \
"Use CTE's or subqueries to select from local tables and use them in joins"
extern List * ExtractRangeTableEntryList(Query *query);
extern bool NeedsDistributedPlanning(Query *query);
extern List * TranslatedVarsForRteIdentity(int rteIdentity);
extern struct DistributedPlan * GetDistributedPlan(CustomScan *node);
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
Index restrictionIndex, RangeTblEntry *rte);

View File

@ -43,6 +43,20 @@ BEGIN
END LOOP;
RETURN false;
END; $$ language plpgsql;
-- helper function that returns true if output of given explain has "is not null" (case in-sensitive)
CREATE OR REPLACE FUNCTION explain_has_distributed_subplan(explain_commmand text)
RETURNS BOOLEAN AS $$
DECLARE
query_plan text;
BEGIN
FOR query_plan IN EXECUTE explain_commmand LOOP
IF query_plan ILIKE '%Distributed Subplan %_%'
THEN
RETURN true;
END IF;
END LOOP;
RETURN false;
END; $$ language plpgsql;
-- helper function to quickly run SQL on the whole cluster
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $$

View File

@ -317,6 +317,14 @@ DEBUG: Router planner cannot handle multi-shard select queries
SELECT * FROM ((SELECT x, y FROM test) UNION ALL (SELECT y, x FROM test)) u ORDER BY 1,2;
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM recursive_union.test
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_2 for subquery SELECT y, x FROM recursive_union.test
DEBUG: Creating router plan
DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) UNION ALL SELECT intermediate_result.y, intermediate_result.x FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(y integer, x integer)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT x, y FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) u ORDER BY x, y
DEBUG: Creating router plan
x | y
---------------------------------------------------------------------
1 | 1

File diff suppressed because one or more lines are too long

View File

@ -48,6 +48,21 @@ BEGIN
RETURN false;
END; $$ language plpgsql;
-- helper function that returns true if output of given explain has "is not null" (case in-sensitive)
CREATE OR REPLACE FUNCTION explain_has_distributed_subplan(explain_commmand text)
RETURNS BOOLEAN AS $$
DECLARE
query_plan text;
BEGIN
FOR query_plan IN EXECUTE explain_commmand LOOP
IF query_plan ILIKE '%Distributed Subplan %_%'
THEN
RETURN true;
END IF;
END LOOP;
RETURN false;
END; $$ language plpgsql;
-- helper function to quickly run SQL on the whole cluster
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $$

View File

@ -30,6 +30,12 @@ CREATE TABLE events_table_part_8 PARTITION OF events_table_part FOR VALUES FROM
SELECT create_distributed_table('events_table_part', 'user_id');
INSERT INTO events_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i;
CREATE TABLE events_table_ref(user_id bigint, value_1 int, value_2 int);
SELECT create_reference_table('events_table_ref');
INSERT INTO events_table_ref SELECT i, i %9, i %50 FROM generate_series(0, 100) i;
CREATE TABLE events_table_local(user_id bigint, value_1 int, value_2 int);
INSERT INTO events_table_local SELECT i, i %9, i %50 FROM generate_series(0, 100) i;
set client_min_messages to DEBUG1;
-- a union all query with 2 different levels of UNION ALL
@ -230,5 +236,424 @@ SELECT DISTINCT user_id FROM
USING (user_id)
ORDER BY 1 LIMIT 1;
-- safe to pushdown
SELECT * FROM (
(SELECT user_id FROM users_table_part UNION ALL SELECT * FROM
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar1) as foo
JOIN
(SELECT user_id FROM users_table_part UNION ALL SELECT * FROM
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar2) as bar
USING (user_id)
)
ORDER BY 1 LIMIT 1;
-- UNION ALL leaf queries deep in the subquery
SELECT * FROM
(
SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
UNION ALL
SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
) as top_level ORDER BY 1 DESC LIMIT 3;
-- UNION ALL leaf queries deep in the subquery
-- and random() calls prevent any pullup
SELECT user_id FROM
(
SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
UNION ALL
SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
) as top_level ORDER BY 1 DESC LIMIT 3;
-- UNION ALL leaf queries deep in the subquery
-- joined with a table
SELECT * FROM
(
SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM users_table_part UNION ALL SELECT * FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
UNION ALL
SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM users_table_part UNION ALL SELECT * FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
) as top_level
JOIN
events_table_part USING(user_id)
ORDER BY 1 DESC LIMIT 3;
-- UNION ALL leaf queries deep in the subquery
-- and random() calls prevent any pullup
SELECT user_id FROM
(
SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM users_table_part UNION ALL SELECT *,1 FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
UNION ALL
SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM users_table_part UNION ALL SELECT *,2 FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
) as top_level
JOIN
events_table_part USING(user_id)
ORDER BY 1 DESC LIMIT 3;
-- a tree with [Q1.1 JOIN Q1.2 UNION ALL Q2.1 JOIN Q2.2] JOIN [Q3.1 JOIN Q3.2 UNION ALL Q4.1 JOIN Q4.2]
-- can be pushed down
SELECT * FROM (
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2
USING(user_id))
) as left_subquery
UNION ALL
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2
USING(user_id))
) as right_subquery
) as top_level_left
JOIN
(
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2
USING(user_id))
) as left_subquery
UNION ALL
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2
USING(user_id))
) as right_subquery
) as top_level_righy USING (user_id)
ORDER BY user_id DESC
LIMIT 1;
-- a tree with [Q1.1 JOIN Q1.2 UNION ALL Q2.1 JOIN Q2.2] JOIN [Q3.1 JOIN Q3.2 UNION ALL Q4.1 JOIN Q4.2]
-- can be pushed down with reference tables
SELECT * FROM (
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l2
USING(user_id))
) as left_subquery
UNION ALL
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l2
USING(user_id))
) as right_subquery
) as top_level_left
JOIN
(
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l2
USING(user_id))
) as left_subquery
UNION ALL
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_ref USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_ref USING (user_id))) as l2
USING(user_id))
) as right_subquery
) as top_level_righy USING (user_id)
ORDER BY user_id DESC
LIMIT 1;
-- a tree with [Q1.1 JOIN Q1.2 UNION ALL Q2.1 JOIN Q2.2] JOIN [Q3.1 JOIN Q3.2 UNION ALL Q4.1 JOIN Q4.2]
-- can be pushed down with local tables after local tables have been recursively planned
SELECT * FROM (
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l2
USING(user_id))
) as left_subquery
UNION ALL
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l2
USING(user_id))
) as right_subquery
) as top_level_left
JOIN
(
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l2
USING(user_id))
) as left_subquery
UNION ALL
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_local USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_local USING (user_id))) as l2
USING(user_id))
) as right_subquery
) as top_level_righy USING (user_id)
ORDER BY user_id DESC
LIMIT 1;
-- a subquery in WHERE clause with
-- a tree with [Q1.1 JOIN Q1.2 UNION ALL Q2.1 JOIN Q2.2] JOIN [Q3.1 JOIN Q3.2 UNION ALL Q4.1 JOIN Q4.2]
-- can be pushed down with FROM tree consisting of JOINs/UNION ALLs
SELECT * FROM
users_table_part u1
JOIN
events_table_part e1 USING (user_id)
JOIN
users_table_part u2 USING (user_id)
JOIN
(SELECT * FROM users_table_part UNION ALL SELECT * FROM events_table_part) as foo USING (user_id)
WHERE user_id IN
(SELECT user_id FROM (
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2
USING(user_id))
) as left_subquery
UNION ALL
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2
USING(user_id))
) as right_subquery
) as top_level_left
JOIN
(
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2
USING(user_id))
) as left_subquery
UNION ALL
SELECT * FROM
(
(((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l1
JOIN
((SELECT * FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT * FROM users_table_part JOIN events_table_part USING (user_id))) as l2
USING(user_id))
) as right_subquery
) as top_level_righy USING (user_id)
ORDER BY user_id DESC
) ORDER BY 1 LIMIT 1;
---------------------------------------------------------------------------
------------ The following tests ensure that we do not accidentally pushdown
------------ queries involving UNION ALL queries if the distribution keys do
------------ not match or any JOIN is not on the distribution key
------------ We used the queries that are defined above
---------------------------------------------------------------------------
RESET client_min_messages;
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT * FROM ((SELECT 1 FROM events_table_part) UNION ALL (SELECT 1 FROM events_table_part)) u;$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT * FROM ((SELECT random() FROM events_table_part) UNION ALL (SELECT user_id FROM events_table_part)) u;$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT * FROM ((SELECT user_id FROM events_table_part) UNION ALL (SELECT user_id - 1 FROM events_table_part)) u;$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT * FROM ((SELECT user_id FROM events_table_part) UNION ALL (SELECT user_id - 1 as user_id FROM events_table_part)) u
JOIN users_table_part USING(user_id);$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT * FROM
(
SELECT events_table_part.value_1 FROM users_table_part JOIN events_table_part USING (user_id)
UNION ALL
SELECT events_table_part.value_1 FROM users_table_part JOIN events_table_part USING (user_id)
) as bar;$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT COUNT(*)
FROM
(SELECT user_id AS user_id
FROM
(SELECT value_1 AS user_id
FROM users_table_part
UNION ALL SELECT user_id AS user_id
FROM users_table_part) AS bar
UNION ALL SELECT user_id AS user_id
FROM users_table_part) AS fool$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT COUNT(*)
FROM
(SELECT user_id AS user_id
FROM
(SELECT count(*) AS user_id
FROM users_table_part GROUP BY user_id
UNION ALL SELECT user_id AS user_id
FROM users_table_part) AS bar
UNION ALL SELECT user_id AS user_id
FROM users_table_part) AS fool$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT * FROM
(
SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT value_1, user_id FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
UNION ALL
SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT user_id, value_1 FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
) as top_level$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT user_id FROM
(
SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT value_1 as user_id,random() FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
UNION ALL
SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT *,random() FROM (SELECT user_id,random() FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
) as top_level
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT * FROM
(
(((SELECT users_table_part.value_1 as user_id FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT users_table_part.user_id as user_id FROM users_table_part JOIN events_table_part USING (user_id))) as l1
JOIN
((SELECT users_table_part.value_1 as user_id FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT users_table_part.user_id as user_id FROM users_table_part JOIN events_table_part USING (user_id))) as l2
USING(user_id))
) as left_subquery
UNION ALL
SELECT * FROM
(
(((SELECT users_table_part.value_1 as user_id FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT users_table_part.user_id as user_id FROM users_table_part JOIN events_table_part USING (user_id))) as l1
JOIN
((SELECT users_table_part.value_1 as user_id FROM users_table_part JOIN events_table_part USING (user_id)) UNION ALL (SELECT users_table_part.user_id as user_id FROM users_table_part JOIN events_table_part USING (user_id))) as l2
USING(user_id))
) as right_subquery
$$);
SELECT public.explain_has_distributed_subplan($$
EXPLAIN SELECT * FROM
(
SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT user_id, value_1 FROM users_table_part UNION ALL SELECT user_id, value_1 FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
UNION ALL
SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT value_1, user_id FROM users_table_part UNION ALL SELECT value_1, user_id FROM users_table_part) as level_5) as level_4) as level_3) as level_2) as level_1
) as top_level
JOIN
events_table_part USING(user_id)
ORDER BY 1 DESC LIMIT 3;
$$);
-- we can pushdown UNION ALL queries that are correlated and exists
-- on the SELECT clause
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT
(SELECT count(*) FROM users_table_part WHERE user_id = e.user_id
UNION ALL
SELECT count(*) FROM users_table_part WHERE user_id = e.user_id)
FROM
(SELECT * FROM users_table_part UNION ALL SELECT * FROM users_table_part) as e;
$$);
-- even if the UNION ALL is not on the distribution key
-- it is safe to pushdown the query because all tables are joined
-- on the distribution keys
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT
(SELECT user_id FROM users_table_part WHERE user_id = e.user_id
UNION ALL
SELECT value_1 FROM users_table_part WHERE user_id = e.user_id)
FROM
(SELECT * FROM users_table_part UNION ALL SELECT * FROM users_table_part) as e;
$$);
-- but if the join is not on the distribution key
-- Citus throws an error
EXPLAIN
SELECT
(SELECT user_id FROM users_table_part WHERE user_id = e.value_1
UNION ALL
SELECT user_id FROM users_table_part WHERE user_id = e.value_1)
FROM
(SELECT * FROM users_table_part) as e;
-- correlated subquery should be able to pushdown
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT * FROM
users_table_part e JOIN LATERAL
(SELECT value_1 FROM users_table_part WHERE user_id = e.user_id
UNION ALL
SELECT value_1 FROM users_table_part WHERE user_id = e.user_id) as foo ON (true);
$$);
-- correlated subquery should be able to pushdown
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT
(SELECT
avg(count)
FROM
(SELECT count(*) as count from users_table_part where users_table_part.user_id = u_low.user_id
UNION ALL
SELECT count(*) from users_table_part where users_table_part.user_id = u_low.user_id) b)
FROM users_table_part u_low;
$$);
-- we cannot pushdown if one side of the UNION ALL
-- is a reference table
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT *
FROM
(SELECT *
FROM events_table_ref
UNION ALL SELECT events_table_ref.*
FROM events_table_part
JOIN events_table_ref USING(user_id)) AS foo
JOIN users_table_part USING(user_id)
LIMIT 1;
$$);
-- we cannot pushdown if one side of the UNION ALL
-- is a local table
SELECT public.explain_has_distributed_subplan($$
EXPLAIN
SELECT *
FROM
(SELECT *
FROM events_table_local
UNION ALL SELECT events_table_local.*
FROM events_table_part
JOIN events_table_local USING(user_id)) AS foo
JOIN users_table_part USING(user_id)
LIMIT 1;
$$);
RESET client_min_messages;
DROP SCHEMA union_pushdown CASCADE;