mirror of https://github.com/citusdata/citus.git
Use translated vars in postgres 13 as well (#4746)
* Use translated vars in postgres 13 as well
Postgres 13 removed translated vars with pg 13 so we had a special logic
for pg 13. However it had some bug, so now we copy the translated vars
before postgres deletes it. This also simplifies the logic.
* fix rtoffset with pg >= 13
(cherry picked from commit feee25dfbd
)
pull/5009/head
parent
b355f0d9a2
commit
bfb1ca6d0d
|
@ -49,6 +49,7 @@
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
#include "nodes/nodeFuncs.h"
|
#include "nodes/nodeFuncs.h"
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
#include "parser/parsetree.h"
|
#include "parser/parsetree.h"
|
||||||
#include "parser/parse_type.h"
|
#include "parser/parse_type.h"
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_12
|
#if PG_VERSION_NUM >= PG_VERSION_12
|
||||||
|
@ -98,6 +99,7 @@ static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
|
||||||
DistributedPlan *distributedPlan,
|
DistributedPlan *distributedPlan,
|
||||||
CustomScan *customScan);
|
CustomScan *customScan);
|
||||||
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
|
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
|
||||||
|
static AppendRelInfo * FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex);
|
||||||
static List * makeTargetListFromCustomScanList(List *custom_scan_tlist);
|
static List * makeTargetListFromCustomScanList(List *custom_scan_tlist);
|
||||||
static List * makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist);
|
static List * makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist);
|
||||||
static int32 BlessRecordExpressionList(List *exprs);
|
static int32 BlessRecordExpressionList(List *exprs);
|
||||||
|
@ -124,6 +126,7 @@ static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *pla
|
||||||
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
|
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
|
||||||
int rteIdCounter);
|
int rteIdCounter);
|
||||||
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
|
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
|
||||||
|
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
|
||||||
|
|
||||||
|
|
||||||
/* Distributed planner hook */
|
/* Distributed planner hook */
|
||||||
|
@ -1814,6 +1817,8 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
|
|
||||||
/* see comments on GetVarFromAssignedParam() */
|
/* see comments on GetVarFromAssignedParam() */
|
||||||
relationRestriction->outerPlanParamsList = OuterPlanParamsList(root);
|
relationRestriction->outerPlanParamsList = OuterPlanParamsList(root);
|
||||||
|
relationRestriction->translatedVars = TranslatedVars(root,
|
||||||
|
relationRestriction->index);
|
||||||
|
|
||||||
RelationRestrictionContext *relationRestrictionContext =
|
RelationRestrictionContext *relationRestrictionContext =
|
||||||
plannerRestrictionContext->relationRestrictionContext;
|
plannerRestrictionContext->relationRestrictionContext;
|
||||||
|
@ -1837,6 +1842,61 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TranslatedVars deep copies the translated vars for the given relation index
|
||||||
|
* if there is any append rel list.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
TranslatedVars(PlannerInfo *root, int relationIndex)
|
||||||
|
{
|
||||||
|
List *translatedVars = NIL;
|
||||||
|
|
||||||
|
if (root->append_rel_list != NIL)
|
||||||
|
{
|
||||||
|
AppendRelInfo *targetAppendRelInfo =
|
||||||
|
FindTargetAppendRelInfo(root, relationIndex);
|
||||||
|
if (targetAppendRelInfo != NULL)
|
||||||
|
{
|
||||||
|
/* postgres deletes translated_vars after pg13, hence we deep copy them here */
|
||||||
|
Node *targetNode = NULL;
|
||||||
|
foreach_ptr(targetNode, targetAppendRelInfo->translated_vars)
|
||||||
|
{
|
||||||
|
translatedVars =
|
||||||
|
lappend(translatedVars, copyObject(targetNode));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return translatedVars;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindTargetAppendRelInfo finds the target append rel info for the given
|
||||||
|
* relation rte index.
|
||||||
|
*/
|
||||||
|
static AppendRelInfo *
|
||||||
|
FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex)
|
||||||
|
{
|
||||||
|
AppendRelInfo *appendRelInfo = NULL;
|
||||||
|
|
||||||
|
/* iterate on the queries that are part of UNION ALL subselects */
|
||||||
|
foreach_ptr(appendRelInfo, root->append_rel_list)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We're only interested in the child rel that is equal to the
|
||||||
|
* relation we're investigating. Here we don't need to find the offset
|
||||||
|
* because postgres adds an offset to child_relid and parent_relid after
|
||||||
|
* calling multi_relation_restriction_hook.
|
||||||
|
*/
|
||||||
|
if (appendRelInfo->child_relid == relationRteIndex)
|
||||||
|
{
|
||||||
|
return appendRelInfo;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AdjustReadIntermediateResultCost adjusts the row count and total cost
|
* AdjustReadIntermediateResultCost adjusts the row count and total cost
|
||||||
* of a read_intermediate_result call based on the file size.
|
* of a read_intermediate_result call based on the file size.
|
||||||
|
|
|
@ -83,7 +83,8 @@ typedef struct AttributeEquivalenceClassMember
|
||||||
|
|
||||||
|
|
||||||
static bool ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext);
|
static bool ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext);
|
||||||
static Var * FindUnionAllVar(PlannerInfo *root, List *appendRelList, Oid relationOid,
|
static int RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo);
|
||||||
|
static Var * FindUnionAllVar(PlannerInfo *root, List *translatedVars, Oid relationOid,
|
||||||
Index relationRteIndex, Index *partitionKeyIndex);
|
Index relationRteIndex, Index *partitionKeyIndex);
|
||||||
static bool ContainsMultipleDistributedRelations(PlannerRestrictionContext *
|
static bool ContainsMultipleDistributedRelations(PlannerRestrictionContext *
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
|
@ -156,9 +157,12 @@ static JoinRestrictionContext * FilterJoinRestrictionContext(
|
||||||
static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int
|
static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int
|
||||||
rangeTableArrayLength, Relids
|
rangeTableArrayLength, Relids
|
||||||
queryRteIdentities);
|
queryRteIdentities);
|
||||||
static int RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo);
|
|
||||||
static Relids QueryRteIdentities(Query *queryTree);
|
static Relids QueryRteIdentities(Query *queryTree);
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||||
|
static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo);
|
||||||
|
#endif
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AllDistributionKeysInQueryAreEqual returns true if either
|
* AllDistributionKeysInQueryAreEqual returns true if either
|
||||||
* (i) there exists join in the query and all relations joined on their
|
* (i) there exists join in the query and all relations joined on their
|
||||||
|
@ -279,7 +283,8 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
|
||||||
*/
|
*/
|
||||||
if (appendRelList != NULL)
|
if (appendRelList != NULL)
|
||||||
{
|
{
|
||||||
varToBeAdded = FindUnionAllVar(relationPlannerRoot, appendRelList,
|
varToBeAdded = FindUnionAllVar(relationPlannerRoot,
|
||||||
|
relationRestriction->translatedVars,
|
||||||
relationRestriction->relationId,
|
relationRestriction->relationId,
|
||||||
relationRestriction->index,
|
relationRestriction->index,
|
||||||
&partitionKeyIndex);
|
&partitionKeyIndex);
|
||||||
|
@ -373,63 +378,65 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RangeTableOffsetCompat returns the range table offset(in glob->finalrtable) for the appendRelInfo.
|
||||||
|
* For PG < 13 this is a no op.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo)
|
||||||
|
{
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||||
|
int parentCount = ParentCountPriorToAppendRel(root->append_rel_list, appendRelInfo);
|
||||||
|
int skipParentCount = parentCount - 1;
|
||||||
|
|
||||||
|
int i = 1;
|
||||||
|
for (; i < root->simple_rel_array_size; i++)
|
||||||
|
{
|
||||||
|
RangeTblEntry *rte = root->simple_rte_array[i];
|
||||||
|
if (rte->inh)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We skip the previous parents because we want to find the offset
|
||||||
|
* for the given append rel info.
|
||||||
|
*/
|
||||||
|
if (skipParentCount > 0)
|
||||||
|
{
|
||||||
|
skipParentCount--;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int indexInRtable = (i - 1);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Postgres adds the global rte array size to parent_relid as an offset.
|
||||||
|
* Here we do the reverse operation: Commit on postgres side:
|
||||||
|
* 6ef77cf46e81f45716ec981cb08781d426181378
|
||||||
|
*/
|
||||||
|
int parentRelIndex = appendRelInfo->parent_relid - 1;
|
||||||
|
return parentRelIndex - indexInRtable;
|
||||||
|
#else
|
||||||
|
return 0;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FindUnionAllVar finds the variable used in union all for the side that has
|
* FindUnionAllVar finds the variable used in union all for the side that has
|
||||||
* relationRteIndex as its index and the same varattno as the partition key of
|
* relationRteIndex as its index and the same varattno as the partition key of
|
||||||
* the given relation with relationOid.
|
* the given relation with relationOid.
|
||||||
*/
|
*/
|
||||||
static Var *
|
static Var *
|
||||||
FindUnionAllVar(PlannerInfo *root, List *appendRelList, Oid relationOid,
|
FindUnionAllVar(PlannerInfo *root, List *translatedVars, Oid relationOid,
|
||||||
Index relationRteIndex, Index *partitionKeyIndex)
|
Index relationRteIndex, Index *partitionKeyIndex)
|
||||||
{
|
{
|
||||||
ListCell *appendRelCell = NULL;
|
|
||||||
AppendRelInfo *targetAppendRelInfo = NULL;
|
|
||||||
AttrNumber childAttrNumber = 0;
|
|
||||||
|
|
||||||
*partitionKeyIndex = 0;
|
|
||||||
|
|
||||||
/* iterate on the queries that are part of UNION ALL subselects */
|
|
||||||
foreach(appendRelCell, appendRelList)
|
|
||||||
{
|
|
||||||
AppendRelInfo *appendRelInfo = (AppendRelInfo *) lfirst(appendRelCell);
|
|
||||||
|
|
||||||
|
|
||||||
int rtoffset = RangeTableOffsetCompat(root, appendRelInfo);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* We're only interested in the child rel that is equal to the
|
|
||||||
* relation we're investigating.
|
|
||||||
*/
|
|
||||||
if (appendRelInfo->child_relid - rtoffset == relationRteIndex)
|
|
||||||
{
|
|
||||||
targetAppendRelInfo = appendRelInfo;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!targetAppendRelInfo)
|
|
||||||
{
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
Var *relationPartitionKey = DistPartitionKeyOrError(relationOid);
|
Var *relationPartitionKey = DistPartitionKeyOrError(relationOid);
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
AttrNumber childAttrNumber = 0;
|
||||||
for (; childAttrNumber < targetAppendRelInfo->num_child_cols; childAttrNumber++)
|
*partitionKeyIndex = 0;
|
||||||
{
|
|
||||||
int curAttNo = targetAppendRelInfo->parent_colnos[childAttrNumber];
|
|
||||||
if (curAttNo == relationPartitionKey->varattno)
|
|
||||||
{
|
|
||||||
*partitionKeyIndex = (childAttrNumber + 1);
|
|
||||||
int rtoffset = RangeTableOffsetCompat(root, targetAppendRelInfo);
|
|
||||||
relationPartitionKey->varno = targetAppendRelInfo->child_relid - rtoffset;
|
|
||||||
return relationPartitionKey;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
ListCell *translatedVarCell;
|
ListCell *translatedVarCell;
|
||||||
List *translaterVars = targetAppendRelInfo->translated_vars;
|
foreach(translatedVarCell, translatedVars)
|
||||||
foreach(translatedVarCell, translaterVars)
|
|
||||||
{
|
{
|
||||||
Node *targetNode = (Node *) lfirst(translatedVarCell);
|
Node *targetNode = (Node *) lfirst(translatedVarCell);
|
||||||
|
|
||||||
|
@ -449,7 +456,6 @@ FindUnionAllVar(PlannerInfo *root, List *appendRelList, Oid relationOid,
|
||||||
return targetVar;
|
return targetVar;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1387,31 +1393,32 @@ AddUnionAllSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass **
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RangeTableOffsetCompat returns the range table offset(in glob->finalrtable) for the appendRelInfo.
|
* ParentCountPriorToAppendRel returns the number of parents that come before
|
||||||
* For PG < 13 this is a no op.
|
* the given append rel info.
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo)
|
ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *targetAppendRelInfo)
|
||||||
{
|
{
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
int targetParentIndex = targetAppendRelInfo->parent_relid;
|
||||||
int i = 1;
|
Bitmapset *parent_ids = NULL;
|
||||||
for (; i < root->simple_rel_array_size; i++)
|
AppendRelInfo *appendRelInfo = NULL;
|
||||||
|
foreach_ptr(appendRelInfo, appendRelList)
|
||||||
{
|
{
|
||||||
RangeTblEntry *rte = root->simple_rte_array[i];
|
int curParentIndex = appendRelInfo->parent_relid;
|
||||||
if (rte->inh)
|
if (curParentIndex <= targetParentIndex)
|
||||||
{
|
{
|
||||||
break;
|
parent_ids = bms_add_member(parent_ids, curParentIndex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int indexInRtable = (i - 1);
|
return bms_num_members(parent_ids);
|
||||||
return appendRelInfo->parent_relid - 1 - (indexInRtable);
|
|
||||||
#else
|
|
||||||
return 0;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AddUnionSetOperationsToAttributeEquivalenceClass recursively iterates on all the
|
* AddUnionSetOperationsToAttributeEquivalenceClass recursively iterates on all the
|
||||||
* setOperations and adds each corresponding target entry to the given equivalence
|
* setOperations and adds each corresponding target entry to the given equivalence
|
||||||
|
|
|
@ -67,6 +67,9 @@ typedef struct RelationRestriction
|
||||||
|
|
||||||
/* list of RootPlanParams for all outer nodes */
|
/* list of RootPlanParams for all outer nodes */
|
||||||
List *outerPlanParamsList;
|
List *outerPlanParamsList;
|
||||||
|
|
||||||
|
/* list of translated vars, this is copied from postgres since it gets deleted on postgres*/
|
||||||
|
List *translatedVars;
|
||||||
} RelationRestriction;
|
} RelationRestriction;
|
||||||
|
|
||||||
typedef struct JoinRestrictionContext
|
typedef struct JoinRestrictionContext
|
||||||
|
|
|
@ -33,7 +33,6 @@ extern List * GenerateAllAttributeEquivalences(PlannerRestrictionContext *
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
extern uint32 UniqueRelationCount(RelationRestrictionContext *restrictionContext,
|
extern uint32 UniqueRelationCount(RelationRestrictionContext *restrictionContext,
|
||||||
CitusTableType tableType);
|
CitusTableType tableType);
|
||||||
|
|
||||||
extern List * DistributedRelationIdList(Query *query);
|
extern List * DistributedRelationIdList(Query *query);
|
||||||
extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery(
|
extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery(
|
||||||
PlannerRestrictionContext *plannerRestrictionContext,
|
PlannerRestrictionContext *plannerRestrictionContext,
|
||||||
|
|
|
@ -191,8 +191,8 @@ s/relation with OID [0-9]+ does not exist/relation with OID XXXX does not exist/
|
||||||
# ignore JIT related messages
|
# ignore JIT related messages
|
||||||
/^DEBUG: probing availability of JIT.*/d
|
/^DEBUG: probing availability of JIT.*/d
|
||||||
/^DEBUG: provider not available, disabling JIT for current session.*/d
|
/^DEBUG: provider not available, disabling JIT for current session.*/d
|
||||||
|
/^DEBUG: time to inline:.*/d
|
||||||
|
/^DEBUG: successfully loaded JIT.*/d
|
||||||
|
|
||||||
# ignore timing statistics for VACUUM VERBOSE
|
# ignore timing statistics for VACUUM VERBOSE
|
||||||
/CPU: user: .*s, system: .*s, elapsed: .*s/d
|
/CPU: user: .*s, system: .*s, elapsed: .*s/d
|
||||||
|
|
|
@ -829,6 +829,57 @@ DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
ERROR: cannot compute aggregate (distinct)
|
ERROR: cannot compute aggregate (distinct)
|
||||||
DETAIL: table partitioning is unsuitable for aggregate (distinct)
|
DETAIL: table partitioning is unsuitable for aggregate (distinct)
|
||||||
|
/* these are not safe to push down as the partition key index is different */
|
||||||
|
SELECT COUNT(*) FROM ((SELECT x,y FROM test) UNION ALL (SELECT y,x FROM test)) u;
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM recursive_union.test
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: generating subplan XXX_2 for subquery SELECT y, x FROM recursive_union.test
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) UNION ALL SELECT intermediate_result.y, intermediate_result.x FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(y integer, x integer)
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) u
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
/* this is safe to push down since the partition key index is the same */
|
||||||
|
SELECT COUNT(*) FROM (SELECT x,y FROM test UNION ALL SELECT x,y FROM test) foo;
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT COUNT(*) FROM
|
||||||
|
((SELECT x,y FROM test UNION ALL SELECT x,y FROM test)
|
||||||
|
UNION ALL
|
||||||
|
(SELECT x,y FROM test UNION ALL SELECT x,y FROM test)) foo;
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
8
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT x AS user_id
|
||||||
|
FROM test
|
||||||
|
UNION ALL SELECT x AS user_id
|
||||||
|
FROM test) AS bar
|
||||||
|
UNION ALL SELECT x AS user_id
|
||||||
|
FROM test) AS fool LIMIT 1;
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
6
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- one of the leaves is a repartition join
|
-- one of the leaves is a repartition join
|
||||||
SET citus.enable_repartition_joins TO ON;
|
SET citus.enable_repartition_joins TO ON;
|
||||||
-- repartition is recursively planned before the set operation
|
-- repartition is recursively planned before the set operation
|
||||||
|
|
|
@ -0,0 +1,293 @@
|
||||||
|
CREATE SCHEMA union_pushdown;
|
||||||
|
SET search_path TO union_pushdown;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);
|
||||||
|
CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (1);
|
||||||
|
CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (1) TO (2);
|
||||||
|
CREATE TABLE users_table_part_2 PARTITION OF users_table_part FOR VALUES FROM (2) TO (3);
|
||||||
|
CREATE TABLE users_table_part_3 PARTITION OF users_table_part FOR VALUES FROM (3) TO (4);
|
||||||
|
CREATE TABLE users_table_part_4 PARTITION OF users_table_part FOR VALUES FROM (4) TO (5);
|
||||||
|
CREATE TABLE users_table_part_5 PARTITION OF users_table_part FOR VALUES FROM (5) TO (6);
|
||||||
|
CREATE TABLE users_table_part_6 PARTITION OF users_table_part FOR VALUES FROM (6) TO (7);
|
||||||
|
CREATE TABLE users_table_part_7 PARTITION OF users_table_part FOR VALUES FROM (7) TO (8);
|
||||||
|
CREATE TABLE users_table_part_8 PARTITION OF users_table_part FOR VALUES FROM (8) TO (9);
|
||||||
|
SELECT create_distributed_table('users_table_part', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO users_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i;
|
||||||
|
CREATE TABLE events_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);
|
||||||
|
CREATE TABLE events_table_part_0 PARTITION OF events_table_part FOR VALUES FROM (0) TO (1);
|
||||||
|
CREATE TABLE events_table_part_1 PARTITION OF events_table_part FOR VALUES FROM (1) TO (2);
|
||||||
|
CREATE TABLE events_table_part_2 PARTITION OF events_table_part FOR VALUES FROM (2) TO (3);
|
||||||
|
CREATE TABLE events_table_part_3 PARTITION OF events_table_part FOR VALUES FROM (3) TO (4);
|
||||||
|
CREATE TABLE events_table_part_4 PARTITION OF events_table_part FOR VALUES FROM (4) TO (5);
|
||||||
|
CREATE TABLE events_table_part_5 PARTITION OF events_table_part FOR VALUES FROM (5) TO (6);
|
||||||
|
CREATE TABLE events_table_part_6 PARTITION OF events_table_part FOR VALUES FROM (6) TO (7);
|
||||||
|
CREATE TABLE events_table_part_7 PARTITION OF events_table_part FOR VALUES FROM (7) TO (8);
|
||||||
|
CREATE TABLE events_table_part_8 PARTITION OF events_table_part FOR VALUES FROM (8) TO (9);
|
||||||
|
SELECT create_distributed_table('events_table_part', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO events_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i;
|
||||||
|
set client_min_messages to DEBUG1;
|
||||||
|
-- a union all query with 2 different levels of UNION ALL
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM users_table_part
|
||||||
|
UNION ALL SELECT user_id AS user_id
|
||||||
|
FROM users_table_part) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id
|
||||||
|
FROM users_table_part) AS fool LIMIT 1;
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
303
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- a union [all] query with 2 different levels of UNION [ALL]
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM users_table_part
|
||||||
|
UNION ALL SELECT user_id AS user_id
|
||||||
|
FROM users_table_part) AS bar
|
||||||
|
UNION SELECT user_id AS user_id
|
||||||
|
FROM users_table_part) AS fool LIMIT 1;
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
101
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- a union all query with several levels and leaf queries
|
||||||
|
SELECT DISTINCT user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
|
||||||
|
UNION ALL
|
||||||
|
SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar
|
||||||
|
UNION ALL
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT DISTINCT user_id AS user_id FROM users_table_part WHERE value_1 = 6
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar
|
||||||
|
ORDER BY 1 LIMIT 1;
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
user_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- a union all query with several levels and leaf queries
|
||||||
|
-- on the partition tables
|
||||||
|
SELECT DISTINCT user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part_2 WHERE value_1 = 2) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
|
||||||
|
UNION ALL
|
||||||
|
SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part_3 WHERE value_1 = 5) AS bar
|
||||||
|
UNION ALL
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT DISTINCT user_id AS user_id FROM users_table_part WHERE value_1 = 6
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part_5 WHERE value_1 = 7) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part_4 WHERE value_1 = 8)) AS bar
|
||||||
|
ORDER BY 1 LIMIT 1;
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
user_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- a union all query with a combine query on the coordinator
|
||||||
|
-- can still be pushed down
|
||||||
|
SELECT COUNT(DISTINCT user_id)
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
|
||||||
|
UNION ALL
|
||||||
|
SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar
|
||||||
|
UNION ALL
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 6
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
89
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- a union all query with ORDER BY LIMIT
|
||||||
|
SELECT COUNT(user_id)
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
|
||||||
|
UNION ALL
|
||||||
|
SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar
|
||||||
|
UNION ALL
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 6
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar
|
||||||
|
ORDER BY 1 DESC LIMIT 10;
|
||||||
|
DEBUG: push down of limit count: 10
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
89
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- a union all query where leaf queries have JOINs on distribution keys
|
||||||
|
-- can be pushded down
|
||||||
|
SELECT COUNT(user_id)
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 1
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 2) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 3
|
||||||
|
UNION ALL
|
||||||
|
SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 4
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 5) AS bar
|
||||||
|
UNION ALL
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 6
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 7) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar
|
||||||
|
ORDER BY 1 DESC LIMIT 10;
|
||||||
|
DEBUG: push down of limit count: 10
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
89
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- a union all query deep down inside a subquery can still be pushed down
|
||||||
|
SELECT COUNT(user_id) FROM (
|
||||||
|
SELECT user_id, random() FROM (
|
||||||
|
SELECT user_id, random() FROM (
|
||||||
|
SELECT user_id, random()
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 1
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 2) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 3
|
||||||
|
UNION ALL
|
||||||
|
SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 4
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 5) AS bar
|
||||||
|
UNION ALL
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT DISTINCT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 6
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 7 AND events_table_part.user_id IN (SELECT user_id FROM users_table_part WHERE users_table_part.value_2 = 3 AND events_table_part.user_id IN (SELECT user_id FROM users_table_part WHERE value_2 = 3))) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar
|
||||||
|
WHERE user_id < 2000 ) as level_1 ) as level_2 ) as level_3
|
||||||
|
ORDER BY 1 DESC LIMIT 10;
|
||||||
|
DEBUG: push down of limit count: 10
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
78
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- safe to pushdown
|
||||||
|
SELECT DISTINCT user_id FROM (
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
|
||||||
|
JOIN
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
|
||||||
|
USING (user_id)
|
||||||
|
UNION ALL
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
|
||||||
|
JOIN
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
|
||||||
|
USING (user_id)
|
||||||
|
) as foo1 ORDER BY 1 LIMIT 1;
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
user_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- safe to pushdown
|
||||||
|
SELECT DISTINCT user_id FROM (
|
||||||
|
SELECT * FROM (
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
|
||||||
|
JOIN
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
|
||||||
|
USING (user_id)
|
||||||
|
UNION ALL
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
|
||||||
|
JOIN
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
|
||||||
|
USING (user_id)) as bar
|
||||||
|
) as foo1 ORDER BY 1 LIMIT 1;
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
user_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- safe to pushdown
|
||||||
|
SELECT DISTINCT user_id FROM
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
|
||||||
|
JOIN
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
|
||||||
|
USING (user_id)
|
||||||
|
ORDER BY 1 LIMIT 1;
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
|
user_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
DROP SCHEMA union_pushdown CASCADE;
|
||||||
|
NOTICE: drop cascades to 2 other objects
|
||||||
|
DETAIL: drop cascades to table users_table_part
|
||||||
|
drop cascades to table events_table_part
|
|
@ -72,7 +72,7 @@ test: multi_create_fdw
|
||||||
# ----------
|
# ----------
|
||||||
# Tests for recursive subquery planning
|
# Tests for recursive subquery planning
|
||||||
# ----------
|
# ----------
|
||||||
# NOTE: The next 6 were in parallel originally, but we got "too many
|
# NOTE: The next 7 were in parallel originally, but we got "too many
|
||||||
# connection" errors on CI. Requires investigation before doing them in
|
# connection" errors on CI. Requires investigation before doing them in
|
||||||
# parallel again.
|
# parallel again.
|
||||||
test: subquery_basics
|
test: subquery_basics
|
||||||
|
@ -80,6 +80,7 @@ test: subquery_local_tables
|
||||||
test: subquery_executors
|
test: subquery_executors
|
||||||
test: subquery_and_cte
|
test: subquery_and_cte
|
||||||
test: set_operations
|
test: set_operations
|
||||||
|
test: union_pushdown
|
||||||
test: set_operation_and_local_tables
|
test: set_operation_and_local_tables
|
||||||
|
|
||||||
test: subqueries_deep subquery_view subquery_partitioning subqueries_not_supported
|
test: subqueries_deep subquery_view subquery_partitioning subqueries_not_supported
|
||||||
|
|
|
@ -74,6 +74,7 @@ SELECT * FROM ((SELECT x, y FROM test) EXCEPT (SELECT y, x FROM test)) u ORDER B
|
||||||
SELECT * FROM ((SELECT * FROM test) EXCEPT (SELECT * FROM ref)) u ORDER BY 1,2;
|
SELECT * FROM ((SELECT * FROM test) EXCEPT (SELECT * FROM ref)) u ORDER BY 1,2;
|
||||||
SELECT * FROM ((SELECT * FROM ref) EXCEPT (SELECT * FROM ref)) u ORDER BY 1,2;
|
SELECT * FROM ((SELECT * FROM ref) EXCEPT (SELECT * FROM ref)) u ORDER BY 1,2;
|
||||||
|
|
||||||
|
|
||||||
-- unions can even be pushed down within a join
|
-- unions can even be pushed down within a join
|
||||||
SELECT * FROM ((SELECT * FROM test) UNION (SELECT * FROM test)) u JOIN test USING (x) ORDER BY 1,2;
|
SELECT * FROM ((SELECT * FROM test) UNION (SELECT * FROM test)) u JOIN test USING (x) ORDER BY 1,2;
|
||||||
SELECT * FROM ((SELECT * FROM test) UNION ALL (SELECT * FROM test)) u LEFT JOIN test USING (x) ORDER BY 1,2;
|
SELECT * FROM ((SELECT * FROM test) UNION ALL (SELECT * FROM test)) u LEFT JOIN test USING (x) ORDER BY 1,2;
|
||||||
|
@ -148,6 +149,27 @@ select avg(DISTINCT t.x) FROM ((SELECT avg(DISTINCT y) FROM test GROUP BY x) UNI
|
||||||
-- other agg. distincts are not supported when group by doesn't include partition key
|
-- other agg. distincts are not supported when group by doesn't include partition key
|
||||||
select count(DISTINCT t.x) FROM ((SELECT avg(DISTINCT y) FROM test GROUP BY y) UNION (SELECT avg(DISTINCT y) FROM test GROUP BY y)) as t(x) ORDER BY 1;
|
select count(DISTINCT t.x) FROM ((SELECT avg(DISTINCT y) FROM test GROUP BY y) UNION (SELECT avg(DISTINCT y) FROM test GROUP BY y)) as t(x) ORDER BY 1;
|
||||||
|
|
||||||
|
/* these are not safe to push down as the partition key index is different */
|
||||||
|
SELECT COUNT(*) FROM ((SELECT x,y FROM test) UNION ALL (SELECT y,x FROM test)) u;
|
||||||
|
|
||||||
|
/* this is safe to push down since the partition key index is the same */
|
||||||
|
SELECT COUNT(*) FROM (SELECT x,y FROM test UNION ALL SELECT x,y FROM test) foo;
|
||||||
|
SELECT COUNT(*) FROM
|
||||||
|
((SELECT x,y FROM test UNION ALL SELECT x,y FROM test)
|
||||||
|
UNION ALL
|
||||||
|
(SELECT x,y FROM test UNION ALL SELECT x,y FROM test)) foo;
|
||||||
|
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT x AS user_id
|
||||||
|
FROM test
|
||||||
|
UNION ALL SELECT x AS user_id
|
||||||
|
FROM test) AS bar
|
||||||
|
UNION ALL SELECT x AS user_id
|
||||||
|
FROM test) AS fool LIMIT 1;
|
||||||
|
|
||||||
-- one of the leaves is a repartition join
|
-- one of the leaves is a repartition join
|
||||||
SET citus.enable_repartition_joins TO ON;
|
SET citus.enable_repartition_joins TO ON;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,234 @@
|
||||||
|
CREATE SCHEMA union_pushdown;
|
||||||
|
SET search_path TO union_pushdown;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);
|
||||||
|
CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (1);
|
||||||
|
CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (1) TO (2);
|
||||||
|
CREATE TABLE users_table_part_2 PARTITION OF users_table_part FOR VALUES FROM (2) TO (3);
|
||||||
|
CREATE TABLE users_table_part_3 PARTITION OF users_table_part FOR VALUES FROM (3) TO (4);
|
||||||
|
CREATE TABLE users_table_part_4 PARTITION OF users_table_part FOR VALUES FROM (4) TO (5);
|
||||||
|
CREATE TABLE users_table_part_5 PARTITION OF users_table_part FOR VALUES FROM (5) TO (6);
|
||||||
|
CREATE TABLE users_table_part_6 PARTITION OF users_table_part FOR VALUES FROM (6) TO (7);
|
||||||
|
CREATE TABLE users_table_part_7 PARTITION OF users_table_part FOR VALUES FROM (7) TO (8);
|
||||||
|
CREATE TABLE users_table_part_8 PARTITION OF users_table_part FOR VALUES FROM (8) TO (9);
|
||||||
|
SELECT create_distributed_table('users_table_part', 'user_id');
|
||||||
|
INSERT INTO users_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE TABLE events_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);
|
||||||
|
CREATE TABLE events_table_part_0 PARTITION OF events_table_part FOR VALUES FROM (0) TO (1);
|
||||||
|
CREATE TABLE events_table_part_1 PARTITION OF events_table_part FOR VALUES FROM (1) TO (2);
|
||||||
|
CREATE TABLE events_table_part_2 PARTITION OF events_table_part FOR VALUES FROM (2) TO (3);
|
||||||
|
CREATE TABLE events_table_part_3 PARTITION OF events_table_part FOR VALUES FROM (3) TO (4);
|
||||||
|
CREATE TABLE events_table_part_4 PARTITION OF events_table_part FOR VALUES FROM (4) TO (5);
|
||||||
|
CREATE TABLE events_table_part_5 PARTITION OF events_table_part FOR VALUES FROM (5) TO (6);
|
||||||
|
CREATE TABLE events_table_part_6 PARTITION OF events_table_part FOR VALUES FROM (6) TO (7);
|
||||||
|
CREATE TABLE events_table_part_7 PARTITION OF events_table_part FOR VALUES FROM (7) TO (8);
|
||||||
|
CREATE TABLE events_table_part_8 PARTITION OF events_table_part FOR VALUES FROM (8) TO (9);
|
||||||
|
SELECT create_distributed_table('events_table_part', 'user_id');
|
||||||
|
INSERT INTO events_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i;
|
||||||
|
|
||||||
|
|
||||||
|
set client_min_messages to DEBUG1;
|
||||||
|
-- a union all query with 2 different levels of UNION ALL
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM users_table_part
|
||||||
|
UNION ALL SELECT user_id AS user_id
|
||||||
|
FROM users_table_part) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id
|
||||||
|
FROM users_table_part) AS fool LIMIT 1;
|
||||||
|
|
||||||
|
-- a union [all] query with 2 different levels of UNION [ALL]
|
||||||
|
SELECT COUNT(*)
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM users_table_part
|
||||||
|
UNION ALL SELECT user_id AS user_id
|
||||||
|
FROM users_table_part) AS bar
|
||||||
|
UNION SELECT user_id AS user_id
|
||||||
|
FROM users_table_part) AS fool LIMIT 1;
|
||||||
|
|
||||||
|
-- a union all query with several levels and leaf queries
|
||||||
|
SELECT DISTINCT user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
|
||||||
|
UNION ALL
|
||||||
|
SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar
|
||||||
|
UNION ALL
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT DISTINCT user_id AS user_id FROM users_table_part WHERE value_1 = 6
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar
|
||||||
|
ORDER BY 1 LIMIT 1;
|
||||||
|
|
||||||
|
-- a union all query with several levels and leaf queries
|
||||||
|
-- on the partition tables
|
||||||
|
SELECT DISTINCT user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part_2 WHERE value_1 = 2) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
|
||||||
|
UNION ALL
|
||||||
|
SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part_3 WHERE value_1 = 5) AS bar
|
||||||
|
UNION ALL
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT DISTINCT user_id AS user_id FROM users_table_part WHERE value_1 = 6
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part_5 WHERE value_1 = 7) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part_4 WHERE value_1 = 8)) AS bar
|
||||||
|
ORDER BY 1 LIMIT 1;
|
||||||
|
|
||||||
|
-- a union all query with a combine query on the coordinator
|
||||||
|
-- can still be pushed down
|
||||||
|
SELECT COUNT(DISTINCT user_id)
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
|
||||||
|
UNION ALL
|
||||||
|
|
||||||
|
SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar
|
||||||
|
UNION ALL
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 6
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar;
|
||||||
|
|
||||||
|
-- a union all query with ORDER BY LIMIT
|
||||||
|
SELECT COUNT(user_id)
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3
|
||||||
|
UNION ALL
|
||||||
|
|
||||||
|
SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar
|
||||||
|
UNION ALL
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 6
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar
|
||||||
|
ORDER BY 1 DESC LIMIT 10;
|
||||||
|
|
||||||
|
-- a union all query where leaf queries have JOINs on distribution keys
|
||||||
|
-- can be pushded down
|
||||||
|
SELECT COUNT(user_id)
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 1
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 2) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 3
|
||||||
|
UNION ALL
|
||||||
|
|
||||||
|
SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 4
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 5) AS bar
|
||||||
|
UNION ALL
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 6
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 7) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar
|
||||||
|
ORDER BY 1 DESC LIMIT 10;
|
||||||
|
|
||||||
|
-- a union all query deep down inside a subquery can still be pushed down
|
||||||
|
SELECT COUNT(user_id) FROM (
|
||||||
|
SELECT user_id, random() FROM (
|
||||||
|
SELECT user_id, random() FROM (
|
||||||
|
SELECT user_id, random()
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 1
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 2) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 3
|
||||||
|
UNION ALL
|
||||||
|
SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 4
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 5) AS bar
|
||||||
|
UNION ALL
|
||||||
|
(SELECT user_id AS user_id
|
||||||
|
FROM
|
||||||
|
(SELECT DISTINCT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 6
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 7 AND events_table_part.user_id IN (SELECT user_id FROM users_table_part WHERE users_table_part.value_2 = 3 AND events_table_part.user_id IN (SELECT user_id FROM users_table_part WHERE value_2 = 3))) AS bar
|
||||||
|
UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar
|
||||||
|
WHERE user_id < 2000 ) as level_1 ) as level_2 ) as level_3
|
||||||
|
ORDER BY 1 DESC LIMIT 10;
|
||||||
|
|
||||||
|
-- safe to pushdown
|
||||||
|
SELECT DISTINCT user_id FROM (
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
|
||||||
|
JOIN
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
|
||||||
|
USING (user_id)
|
||||||
|
UNION ALL
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
|
||||||
|
JOIN
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
|
||||||
|
USING (user_id)
|
||||||
|
) as foo1 ORDER BY 1 LIMIT 1;
|
||||||
|
|
||||||
|
-- safe to pushdown
|
||||||
|
SELECT DISTINCT user_id FROM (
|
||||||
|
SELECT * FROM (
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
|
||||||
|
JOIN
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
|
||||||
|
USING (user_id)
|
||||||
|
UNION ALL
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
|
||||||
|
JOIN
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
|
||||||
|
USING (user_id)) as bar
|
||||||
|
) as foo1 ORDER BY 1 LIMIT 1;
|
||||||
|
|
||||||
|
-- safe to pushdown
|
||||||
|
SELECT DISTINCT user_id FROM
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo
|
||||||
|
JOIN
|
||||||
|
(SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar
|
||||||
|
USING (user_id)
|
||||||
|
ORDER BY 1 LIMIT 1;
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
DROP SCHEMA union_pushdown CASCADE;
|
Loading…
Reference in New Issue