Use translated vars in postgres 13 as well (#4746)

* Use translated vars in postgres 13 as well

Postgres 13 removed translated vars with pg 13 so we had a special logic
for pg 13. However it had some bug, so now we copy the translated vars
before postgres deletes it. This also simplifies the logic.

* fix rtoffset with pg >= 13
pull/4751/head
SaitTalhaNisanci 2021-02-26 19:41:29 +03:00 committed by GitHub
parent 85c382a63b
commit feee25dfbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 738 additions and 68 deletions

View File

@ -49,6 +49,7 @@
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/pg_list.h"
#include "parser/parsetree.h"
#include "parser/parse_type.h"
#if PG_VERSION_NUM >= PG_VERSION_12
@ -98,6 +99,7 @@ static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
DistributedPlan *distributedPlan,
CustomScan *customScan);
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
static AppendRelInfo * FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex);
static List * makeTargetListFromCustomScanList(List *custom_scan_tlist);
static List * makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist);
static int32 BlessRecordExpressionList(List *exprs);
@ -124,6 +126,7 @@ static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *pla
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter);
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
/* Distributed planner hook */
@ -1814,6 +1817,8 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
/* see comments on GetVarFromAssignedParam() */
relationRestriction->outerPlanParamsList = OuterPlanParamsList(root);
relationRestriction->translatedVars = TranslatedVars(root,
relationRestriction->index);
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
@ -1837,6 +1842,61 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
}
/*
* TranslatedVars deep copies the translated vars for the given relation index
* if there is any append rel list.
*/
static List *
TranslatedVars(PlannerInfo *root, int relationIndex)
{
List *translatedVars = NIL;
if (root->append_rel_list != NIL)
{
AppendRelInfo *targetAppendRelInfo =
FindTargetAppendRelInfo(root, relationIndex);
if (targetAppendRelInfo != NULL)
{
/* postgres deletes translated_vars after pg13, hence we deep copy them here */
Node *targetNode = NULL;
foreach_ptr(targetNode, targetAppendRelInfo->translated_vars)
{
translatedVars =
lappend(translatedVars, copyObject(targetNode));
}
}
}
return translatedVars;
}
/*
* FindTargetAppendRelInfo finds the target append rel info for the given
* relation rte index.
*/
static AppendRelInfo *
FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex)
{
AppendRelInfo *appendRelInfo = NULL;
/* iterate on the queries that are part of UNION ALL subselects */
foreach_ptr(appendRelInfo, root->append_rel_list)
{
/*
* We're only interested in the child rel that is equal to the
* relation we're investigating. Here we don't need to find the offset
* because postgres adds an offset to child_relid and parent_relid after
* calling multi_relation_restriction_hook.
*/
if (appendRelInfo->child_relid == relationRteIndex)
{
return appendRelInfo;
}
}
return NULL;
}
/*
* AdjustReadIntermediateResultCost adjusts the row count and total cost
* of a read_intermediate_result call based on the file size.

View File

@ -83,7 +83,8 @@ typedef struct AttributeEquivalenceClassMember
static bool ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext);
static Var * FindUnionAllVar(PlannerInfo *root, List *appendRelList, Oid relationOid,
static int RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo);
static Var * FindUnionAllVar(PlannerInfo *root, List *translatedVars, Oid relationOid,
Index relationRteIndex, Index *partitionKeyIndex);
static bool ContainsMultipleDistributedRelations(PlannerRestrictionContext *
plannerRestrictionContext);
@ -156,9 +157,12 @@ static JoinRestrictionContext * FilterJoinRestrictionContext(
static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int
rangeTableArrayLength, Relids
queryRteIdentities);
static int RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo);
static Relids QueryRteIdentities(Query *queryTree);
#if PG_VERSION_NUM >= PG_VERSION_13
static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo);
#endif
/*
* AllDistributionKeysInQueryAreEqual returns true if either
* (i) there exists join in the query and all relations joined on their
@ -279,7 +283,8 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
*/
if (appendRelList != NULL)
{
varToBeAdded = FindUnionAllVar(relationPlannerRoot, appendRelList,
varToBeAdded = FindUnionAllVar(relationPlannerRoot,
relationRestriction->translatedVars,
relationRestriction->relationId,
relationRestriction->index,
&partitionKeyIndex);
@ -373,63 +378,65 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
}
/*
* RangeTableOffsetCompat returns the range table offset(in glob->finalrtable) for the appendRelInfo.
* For PG < 13 this is a no op.
*/
static int
RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo)
{
#if PG_VERSION_NUM >= PG_VERSION_13
int parentCount = ParentCountPriorToAppendRel(root->append_rel_list, appendRelInfo);
int skipParentCount = parentCount - 1;
int i = 1;
for (; i < root->simple_rel_array_size; i++)
{
RangeTblEntry *rte = root->simple_rte_array[i];
if (rte->inh)
{
/*
* We skip the previous parents because we want to find the offset
* for the given append rel info.
*/
if (skipParentCount > 0)
{
skipParentCount--;
continue;
}
break;
}
}
int indexInRtable = (i - 1);
/*
* Postgres adds the global rte array size to parent_relid as an offset.
* Here we do the reverse operation: Commit on postgres side:
* 6ef77cf46e81f45716ec981cb08781d426181378
*/
int parentRelIndex = appendRelInfo->parent_relid - 1;
return parentRelIndex - indexInRtable;
#else
return 0;
#endif
}
/*
* FindUnionAllVar finds the variable used in union all for the side that has
* relationRteIndex as its index and the same varattno as the partition key of
* the given relation with relationOid.
*/
static Var *
FindUnionAllVar(PlannerInfo *root, List *appendRelList, Oid relationOid,
FindUnionAllVar(PlannerInfo *root, List *translatedVars, Oid relationOid,
Index relationRteIndex, Index *partitionKeyIndex)
{
ListCell *appendRelCell = NULL;
AppendRelInfo *targetAppendRelInfo = NULL;
AttrNumber childAttrNumber = 0;
*partitionKeyIndex = 0;
/* iterate on the queries that are part of UNION ALL subselects */
foreach(appendRelCell, appendRelList)
{
AppendRelInfo *appendRelInfo = (AppendRelInfo *) lfirst(appendRelCell);
int rtoffset = RangeTableOffsetCompat(root, appendRelInfo);
/*
* We're only interested in the child rel that is equal to the
* relation we're investigating.
*/
if (appendRelInfo->child_relid - rtoffset == relationRteIndex)
{
targetAppendRelInfo = appendRelInfo;
break;
}
}
if (!targetAppendRelInfo)
{
return NULL;
}
Var *relationPartitionKey = DistPartitionKeyOrError(relationOid);
#if PG_VERSION_NUM >= PG_VERSION_13
for (; childAttrNumber < targetAppendRelInfo->num_child_cols; childAttrNumber++)
{
int curAttNo = targetAppendRelInfo->parent_colnos[childAttrNumber];
if (curAttNo == relationPartitionKey->varattno)
{
*partitionKeyIndex = (childAttrNumber + 1);
int rtoffset = RangeTableOffsetCompat(root, targetAppendRelInfo);
relationPartitionKey->varno = targetAppendRelInfo->child_relid - rtoffset;
return relationPartitionKey;
}
}
#else
AttrNumber childAttrNumber = 0;
*partitionKeyIndex = 0;
ListCell *translatedVarCell;
List *translaterVars = targetAppendRelInfo->translated_vars;
foreach(translatedVarCell, translaterVars)
foreach(translatedVarCell, translatedVars)
{
Node *targetNode = (Node *) lfirst(translatedVarCell);
@ -449,7 +456,6 @@ FindUnionAllVar(PlannerInfo *root, List *appendRelList, Oid relationOid,
return targetVar;
}
}
#endif
return NULL;
}
@ -1387,31 +1393,32 @@ AddUnionAllSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass **
}
#if PG_VERSION_NUM >= PG_VERSION_13
/*
* RangeTableOffsetCompat returns the range table offset(in glob->finalrtable) for the appendRelInfo.
* For PG < 13 this is a no op.
* ParentCountPriorToAppendRel returns the number of parents that come before
* the given append rel info.
*/
static int
RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo)
ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *targetAppendRelInfo)
{
#if PG_VERSION_NUM >= PG_VERSION_13
int i = 1;
for (; i < root->simple_rel_array_size; i++)
int targetParentIndex = targetAppendRelInfo->parent_relid;
Bitmapset *parent_ids = NULL;
AppendRelInfo *appendRelInfo = NULL;
foreach_ptr(appendRelInfo, appendRelList)
{
RangeTblEntry *rte = root->simple_rte_array[i];
if (rte->inh)
int curParentIndex = appendRelInfo->parent_relid;
if (curParentIndex <= targetParentIndex)
{
break;
parent_ids = bms_add_member(parent_ids, curParentIndex);
}
}
int indexInRtable = (i - 1);
return appendRelInfo->parent_relid - 1 - (indexInRtable);
#else
return 0;
#endif
return bms_num_members(parent_ids);
}
#endif
/*
* AddUnionSetOperationsToAttributeEquivalenceClass recursively iterates on all the
* setOperations and adds each corresponding target entry to the given equivalence

View File

@ -67,6 +67,9 @@ typedef struct RelationRestriction
/* list of RootPlanParams for all outer nodes */
List *outerPlanParamsList;
/* list of translated vars, this is copied from postgres since it gets deleted on postgres*/
List *translatedVars;
} RelationRestriction;
typedef struct JoinRestrictionContext

View File

@ -33,7 +33,6 @@ extern List * GenerateAllAttributeEquivalences(PlannerRestrictionContext *
plannerRestrictionContext);
extern uint32 UniqueRelationCount(RelationRestrictionContext *restrictionContext,
CitusTableType tableType);
extern List * DistributedRelationIdList(Query *query);
extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery(
PlannerRestrictionContext *plannerRestrictionContext,

View File

@ -191,8 +191,8 @@ s/relation with OID [0-9]+ does not exist/relation with OID XXXX does not exist/
# ignore JIT related messages
/^DEBUG: probing availability of JIT.*/d
/^DEBUG: provider not available, disabling JIT for current session.*/d
/^DEBUG: time to inline:.*/d
/^DEBUG: successfully loaded JIT.*/d
# ignore timing statistics for VACUUM VERBOSE
/CPU: user: .*s, system: .*s, elapsed: .*s/d

View File

@ -829,6 +829,57 @@ DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Router planner cannot handle multi-shard select queries
ERROR: cannot compute aggregate (distinct)
DETAIL: table partitioning is unsuitable for aggregate (distinct)
/* these are not safe to push down as the partition key index is different */
SELECT COUNT(*) FROM ((SELECT x,y FROM test) UNION ALL (SELECT y,x FROM test)) u;
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM recursive_union.test
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_2 for subquery SELECT y, x FROM recursive_union.test
DEBUG: Creating router plan
DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) UNION ALL SELECT intermediate_result.y, intermediate_result.x FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(y integer, x integer)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) u
DEBUG: Creating router plan
count
---------------------------------------------------------------------
4
(1 row)
/* this is safe to push down since the partition key index is the same */
SELECT COUNT(*) FROM (SELECT x,y FROM test UNION ALL SELECT x,y FROM test) foo;
DEBUG: Router planner cannot handle multi-shard select queries
count
---------------------------------------------------------------------
4
(1 row)
SELECT COUNT(*) FROM
((SELECT x,y FROM test UNION ALL SELECT x,y FROM test)
UNION ALL
(SELECT x,y FROM test UNION ALL SELECT x,y FROM test)) foo;
DEBUG: Router planner cannot handle multi-shard select queries
count
---------------------------------------------------------------------
8
(1 row)
SELECT COUNT(*)
FROM
(SELECT user_id AS user_id
FROM
(SELECT x AS user_id
FROM test
UNION ALL SELECT x AS user_id
FROM test) AS bar
UNION ALL SELECT x AS user_id
FROM test) AS fool LIMIT 1;
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: push down of limit count: 1
count
---------------------------------------------------------------------
6
(1 row)
-- one of the leaves is a repartition join
SET citus.enable_repartition_joins TO ON;
-- repartition is recursively planned before the set operation

View File

@ -0,0 +1,293 @@
CREATE SCHEMA union_pushdown;
SET search_path TO union_pushdown;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);
CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (1);
CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (1) TO (2);
CREATE TABLE users_table_part_2 PARTITION OF users_table_part FOR VALUES FROM (2) TO (3);
CREATE TABLE users_table_part_3 PARTITION OF users_table_part FOR VALUES FROM (3) TO (4);
CREATE TABLE users_table_part_4 PARTITION OF users_table_part FOR VALUES FROM (4) TO (5);
CREATE TABLE users_table_part_5 PARTITION OF users_table_part FOR VALUES FROM (5) TO (6);
CREATE TABLE users_table_part_6 PARTITION OF users_table_part FOR VALUES FROM (6) TO (7);
CREATE TABLE users_table_part_7 PARTITION OF users_table_part FOR VALUES FROM (7) TO (8);
CREATE TABLE users_table_part_8 PARTITION OF users_table_part FOR VALUES FROM (8) TO (9);
SELECT create_distributed_table('users_table_part', 'user_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO users_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i;
CREATE TABLE events_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);
CREATE TABLE events_table_part_0 PARTITION OF events_table_part FOR VALUES FROM (0) TO (1);
CREATE TABLE events_table_part_1 PARTITION OF events_table_part FOR VALUES FROM (1) TO (2);
CREATE TABLE events_table_part_2 PARTITION OF events_table_part FOR VALUES FROM (2) TO (3);
CREATE TABLE events_table_part_3 PARTITION OF events_table_part FOR VALUES FROM (3) TO (4);
CREATE TABLE events_table_part_4 PARTITION OF events_table_part FOR VALUES FROM (4) TO (5);
CREATE TABLE events_table_part_5 PARTITION OF events_table_part FOR VALUES FROM (5) TO (6);
CREATE TABLE events_table_part_6 PARTITION OF events_table_part FOR VALUES FROM (6) TO (7);
CREATE TABLE events_table_part_7 PARTITION OF events_table_part FOR VALUES FROM (7) TO (8);
CREATE TABLE events_table_part_8 PARTITION OF events_table_part FOR VALUES FROM (8) TO (9);
SELECT create_distributed_table('events_table_part', 'user_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO events_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i;
set client_min_messages to DEBUG1;
-- a union all query with 2 different levels of UNION ALL
SELECT COUNT(*)
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id
FROM users_table_part
UNION ALL SELECT user_id AS user_id
FROM users_table_part) AS bar
UNION ALL SELECT user_id AS user_id
FROM users_table_part) AS fool LIMIT 1;
DEBUG: push down of limit count: 1
count
---------------------------------------------------------------------
303
(1 row)
-- a union [all] query with 2 different levels of UNION [ALL]
SELECT COUNT(*)
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id
FROM users_table_part
UNION ALL SELECT user_id AS user_id
FROM users_table_part) AS bar
UNION SELECT user_id AS user_id
FROM users_table_part) AS fool LIMIT 1;
DEBUG: push down of limit count: 1
count
---------------------------------------------------------------------
101
(1 row)
-- a union all query with several levels and leaf queries
SELECT DISTINCT user_id
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
UNION ALL
SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar
UNION ALL
(SELECT user_id AS user_id
FROM
(SELECT DISTINCT user_id AS user_id FROM users_table_part WHERE value_1 = 6
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar
ORDER BY 1 LIMIT 1;
DEBUG: push down of limit count: 1
user_id
---------------------------------------------------------------------
1
(1 row)
-- a union all query with several levels and leaf queries
-- on the partition tables
SELECT DISTINCT user_id
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
UNION ALL SELECT user_id AS user_id FROM users_table_part_2 WHERE value_1 = 2) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
UNION ALL
SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
UNION ALL SELECT user_id AS user_id FROM users_table_part_3 WHERE value_1 = 5) AS bar
UNION ALL
(SELECT user_id AS user_id
FROM
(SELECT DISTINCT user_id AS user_id FROM users_table_part WHERE value_1 = 6
UNION ALL SELECT user_id AS user_id FROM users_table_part_5 WHERE value_1 = 7) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part_4 WHERE value_1 = 8)) AS bar
ORDER BY 1 LIMIT 1;
DEBUG: push down of limit count: 1
user_id
---------------------------------------------------------------------
1
(1 row)
-- a union all query with a combine query on the coordinator
-- can still be pushed down
SELECT COUNT(DISTINCT user_id)
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
UNION ALL
SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar
UNION ALL
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 6
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar;
count
---------------------------------------------------------------------
89
(1 row)
-- a union all query with ORDER BY LIMIT
SELECT COUNT(user_id)
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
UNION ALL
SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar
UNION ALL
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 6
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar
ORDER BY 1 DESC LIMIT 10;
DEBUG: push down of limit count: 10
count
---------------------------------------------------------------------
89
(1 row)
-- a union all query where leaf queries have JOINs on distribution keys
-- can be pushded down
SELECT COUNT(user_id)
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 1
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 2) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 3
UNION ALL
SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 4
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 5) AS bar
UNION ALL
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 6
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 7) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar
ORDER BY 1 DESC LIMIT 10;
DEBUG: push down of limit count: 10
count
---------------------------------------------------------------------
89
(1 row)
-- a union all query deep down inside a subquery can still be pushed down
SELECT COUNT(user_id) FROM (
SELECT user_id, random() FROM (
SELECT user_id, random() FROM (
SELECT user_id, random()
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 1
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 2) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 3
UNION ALL
SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 4
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 5) AS bar
UNION ALL
(SELECT user_id AS user_id
FROM
(SELECT DISTINCT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 6
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 7 AND events_table_part.user_id IN (SELECT user_id FROM users_table_part WHERE users_table_part.value_2 = 3 AND events_table_part.user_id IN (SELECT user_id FROM users_table_part WHERE value_2 = 3))) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar
WHERE user_id < 2000 ) as level_1 ) as level_2 ) as level_3
ORDER BY 1 DESC LIMIT 10;
DEBUG: push down of limit count: 10
count
---------------------------------------------------------------------
78
(1 row)
-- safe to pushdown
SELECT DISTINCT user_id FROM (
SELECT * FROM
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
JOIN
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
USING (user_id)
UNION ALL
SELECT * FROM
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
JOIN
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
USING (user_id)
) as foo1 ORDER BY 1 LIMIT 1;
DEBUG: push down of limit count: 1
user_id
---------------------------------------------------------------------
0
(1 row)
-- safe to pushdown
SELECT DISTINCT user_id FROM (
SELECT * FROM (
SELECT * FROM
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
JOIN
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
USING (user_id)
UNION ALL
SELECT * FROM
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
JOIN
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
USING (user_id)) as bar
) as foo1 ORDER BY 1 LIMIT 1;
DEBUG: push down of limit count: 1
user_id
---------------------------------------------------------------------
0
(1 row)
-- safe to pushdown
SELECT DISTINCT user_id FROM
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
JOIN
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
USING (user_id)
ORDER BY 1 LIMIT 1;
DEBUG: push down of limit count: 1
user_id
---------------------------------------------------------------------
0
(1 row)
RESET client_min_messages;
DROP SCHEMA union_pushdown CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table users_table_part
drop cascades to table events_table_part

View File

@ -72,7 +72,7 @@ test: multi_create_fdw
# ----------
# Tests for recursive subquery planning
# ----------
# NOTE: The next 6 were in parallel originally, but we got "too many
# NOTE: The next 7 were in parallel originally, but we got "too many
# connection" errors on CI. Requires investigation before doing them in
# parallel again.
test: subquery_basics
@ -80,6 +80,7 @@ test: subquery_local_tables
test: subquery_executors
test: subquery_and_cte
test: set_operations
test: union_pushdown
test: set_operation_and_local_tables
test: subqueries_deep subquery_view subquery_partitioning subqueries_not_supported

View File

@ -74,6 +74,7 @@ SELECT * FROM ((SELECT x, y FROM test) EXCEPT (SELECT y, x FROM test)) u ORDER B
SELECT * FROM ((SELECT * FROM test) EXCEPT (SELECT * FROM ref)) u ORDER BY 1,2;
SELECT * FROM ((SELECT * FROM ref) EXCEPT (SELECT * FROM ref)) u ORDER BY 1,2;
-- unions can even be pushed down within a join
SELECT * FROM ((SELECT * FROM test) UNION (SELECT * FROM test)) u JOIN test USING (x) ORDER BY 1,2;
SELECT * FROM ((SELECT * FROM test) UNION ALL (SELECT * FROM test)) u LEFT JOIN test USING (x) ORDER BY 1,2;
@ -148,6 +149,27 @@ select avg(DISTINCT t.x) FROM ((SELECT avg(DISTINCT y) FROM test GROUP BY x) UNI
-- other agg. distincts are not supported when group by doesn't include partition key
select count(DISTINCT t.x) FROM ((SELECT avg(DISTINCT y) FROM test GROUP BY y) UNION (SELECT avg(DISTINCT y) FROM test GROUP BY y)) as t(x) ORDER BY 1;
/* these are not safe to push down as the partition key index is different */
SELECT COUNT(*) FROM ((SELECT x,y FROM test) UNION ALL (SELECT y,x FROM test)) u;
/* this is safe to push down since the partition key index is the same */
SELECT COUNT(*) FROM (SELECT x,y FROM test UNION ALL SELECT x,y FROM test) foo;
SELECT COUNT(*) FROM
((SELECT x,y FROM test UNION ALL SELECT x,y FROM test)
UNION ALL
(SELECT x,y FROM test UNION ALL SELECT x,y FROM test)) foo;
SELECT COUNT(*)
FROM
(SELECT user_id AS user_id
FROM
(SELECT x AS user_id
FROM test
UNION ALL SELECT x AS user_id
FROM test) AS bar
UNION ALL SELECT x AS user_id
FROM test) AS fool LIMIT 1;
-- one of the leaves is a repartition join
SET citus.enable_repartition_joins TO ON;

View File

@ -0,0 +1,234 @@
CREATE SCHEMA union_pushdown;
SET search_path TO union_pushdown;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);
CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (1);
CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (1) TO (2);
CREATE TABLE users_table_part_2 PARTITION OF users_table_part FOR VALUES FROM (2) TO (3);
CREATE TABLE users_table_part_3 PARTITION OF users_table_part FOR VALUES FROM (3) TO (4);
CREATE TABLE users_table_part_4 PARTITION OF users_table_part FOR VALUES FROM (4) TO (5);
CREATE TABLE users_table_part_5 PARTITION OF users_table_part FOR VALUES FROM (5) TO (6);
CREATE TABLE users_table_part_6 PARTITION OF users_table_part FOR VALUES FROM (6) TO (7);
CREATE TABLE users_table_part_7 PARTITION OF users_table_part FOR VALUES FROM (7) TO (8);
CREATE TABLE users_table_part_8 PARTITION OF users_table_part FOR VALUES FROM (8) TO (9);
SELECT create_distributed_table('users_table_part', 'user_id');
INSERT INTO users_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i;
CREATE TABLE events_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);
CREATE TABLE events_table_part_0 PARTITION OF events_table_part FOR VALUES FROM (0) TO (1);
CREATE TABLE events_table_part_1 PARTITION OF events_table_part FOR VALUES FROM (1) TO (2);
CREATE TABLE events_table_part_2 PARTITION OF events_table_part FOR VALUES FROM (2) TO (3);
CREATE TABLE events_table_part_3 PARTITION OF events_table_part FOR VALUES FROM (3) TO (4);
CREATE TABLE events_table_part_4 PARTITION OF events_table_part FOR VALUES FROM (4) TO (5);
CREATE TABLE events_table_part_5 PARTITION OF events_table_part FOR VALUES FROM (5) TO (6);
CREATE TABLE events_table_part_6 PARTITION OF events_table_part FOR VALUES FROM (6) TO (7);
CREATE TABLE events_table_part_7 PARTITION OF events_table_part FOR VALUES FROM (7) TO (8);
CREATE TABLE events_table_part_8 PARTITION OF events_table_part FOR VALUES FROM (8) TO (9);
SELECT create_distributed_table('events_table_part', 'user_id');
INSERT INTO events_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i;
set client_min_messages to DEBUG1;
-- a union all query with 2 different levels of UNION ALL
SELECT COUNT(*)
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id
FROM users_table_part
UNION ALL SELECT user_id AS user_id
FROM users_table_part) AS bar
UNION ALL SELECT user_id AS user_id
FROM users_table_part) AS fool LIMIT 1;
-- a union [all] query with 2 different levels of UNION [ALL]
SELECT COUNT(*)
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id
FROM users_table_part
UNION ALL SELECT user_id AS user_id
FROM users_table_part) AS bar
UNION SELECT user_id AS user_id
FROM users_table_part) AS fool LIMIT 1;
-- a union all query with several levels and leaf queries
SELECT DISTINCT user_id
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
UNION ALL
SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar
UNION ALL
(SELECT user_id AS user_id
FROM
(SELECT DISTINCT user_id AS user_id FROM users_table_part WHERE value_1 = 6
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar
ORDER BY 1 LIMIT 1;
-- a union all query with several levels and leaf queries
-- on the partition tables
SELECT DISTINCT user_id
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
UNION ALL SELECT user_id AS user_id FROM users_table_part_2 WHERE value_1 = 2) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
UNION ALL
SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
UNION ALL SELECT user_id AS user_id FROM users_table_part_3 WHERE value_1 = 5) AS bar
UNION ALL
(SELECT user_id AS user_id
FROM
(SELECT DISTINCT user_id AS user_id FROM users_table_part WHERE value_1 = 6
UNION ALL SELECT user_id AS user_id FROM users_table_part_5 WHERE value_1 = 7) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part_4 WHERE value_1 = 8)) AS bar
ORDER BY 1 LIMIT 1;
-- a union all query with a combine query on the coordinator
-- can still be pushed down
SELECT COUNT(DISTINCT user_id)
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
UNION ALL
SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar
UNION ALL
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 6
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar;
-- a union all query with ORDER BY LIMIT
SELECT COUNT(user_id)
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
UNION ALL
SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar
UNION ALL
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 6
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar
ORDER BY 1 DESC LIMIT 10;
-- a union all query where leaf queries have JOINs on distribution keys
-- can be pushded down
SELECT COUNT(user_id)
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 1
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 2) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 3
UNION ALL
SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 4
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 5) AS bar
UNION ALL
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 6
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 7) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar
ORDER BY 1 DESC LIMIT 10;
-- a union all query deep down inside a subquery can still be pushed down
SELECT COUNT(user_id) FROM (
SELECT user_id, random() FROM (
SELECT user_id, random() FROM (
SELECT user_id, random()
FROM
(SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 1
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 2) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 3
UNION ALL
SELECT user_id AS user_id
FROM
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 4
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 5) AS bar
UNION ALL
(SELECT user_id AS user_id
FROM
(SELECT DISTINCT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 6
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 7 AND events_table_part.user_id IN (SELECT user_id FROM users_table_part WHERE users_table_part.value_2 = 3 AND events_table_part.user_id IN (SELECT user_id FROM users_table_part WHERE value_2 = 3))) AS bar
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar
WHERE user_id < 2000 ) as level_1 ) as level_2 ) as level_3
ORDER BY 1 DESC LIMIT 10;
-- safe to pushdown
SELECT DISTINCT user_id FROM (
SELECT * FROM
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
JOIN
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
USING (user_id)
UNION ALL
SELECT * FROM
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
JOIN
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
USING (user_id)
) as foo1 ORDER BY 1 LIMIT 1;
-- safe to pushdown
SELECT DISTINCT user_id FROM (
SELECT * FROM (
SELECT * FROM
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
JOIN
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
USING (user_id)
UNION ALL
SELECT * FROM
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
JOIN
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
USING (user_id)) as bar
) as foo1 ORDER BY 1 LIMIT 1;
-- safe to pushdown
SELECT DISTINCT user_id FROM
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
JOIN
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
USING (user_id)
ORDER BY 1 LIMIT 1;
RESET client_min_messages;
DROP SCHEMA union_pushdown CASCADE;