diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index a0c27fc66..0040a4185 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -49,6 +49,7 @@ #include "executor/executor.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "nodes/pg_list.h" #include "parser/parsetree.h" #include "parser/parse_type.h" #if PG_VERSION_NUM >= PG_VERSION_12 @@ -98,6 +99,7 @@ static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan, CustomScan *customScan); static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan); +static AppendRelInfo * FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex); static List * makeTargetListFromCustomScanList(List *custom_scan_tlist); static List * makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist); static int32 BlessRecordExpressionList(List *exprs); @@ -124,6 +126,7 @@ static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *pla static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, int rteIdCounter); static RTEListProperties * GetRTEListProperties(List *rangeTableList); +static List * TranslatedVars(PlannerInfo *root, int relationIndex); /* Distributed planner hook */ @@ -1814,6 +1817,8 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, /* see comments on GetVarFromAssignedParam() */ relationRestriction->outerPlanParamsList = OuterPlanParamsList(root); + relationRestriction->translatedVars = TranslatedVars(root, + relationRestriction->index); RelationRestrictionContext *relationRestrictionContext = plannerRestrictionContext->relationRestrictionContext; @@ -1837,6 +1842,61 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, } +/* + * TranslatedVars deep copies the translated vars for the given relation index + * if there is any append rel list. + */ +static List * +TranslatedVars(PlannerInfo *root, int relationIndex) +{ + List *translatedVars = NIL; + + if (root->append_rel_list != NIL) + { + AppendRelInfo *targetAppendRelInfo = + FindTargetAppendRelInfo(root, relationIndex); + if (targetAppendRelInfo != NULL) + { + /* postgres deletes translated_vars after pg13, hence we deep copy them here */ + Node *targetNode = NULL; + foreach_ptr(targetNode, targetAppendRelInfo->translated_vars) + { + translatedVars = + lappend(translatedVars, copyObject(targetNode)); + } + } + } + return translatedVars; +} + + +/* + * FindTargetAppendRelInfo finds the target append rel info for the given + * relation rte index. + */ +static AppendRelInfo * +FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex) +{ + AppendRelInfo *appendRelInfo = NULL; + + /* iterate on the queries that are part of UNION ALL subselects */ + foreach_ptr(appendRelInfo, root->append_rel_list) + { + /* + * We're only interested in the child rel that is equal to the + * relation we're investigating. Here we don't need to find the offset + * because postgres adds an offset to child_relid and parent_relid after + * calling multi_relation_restriction_hook. + */ + if (appendRelInfo->child_relid == relationRteIndex) + { + return appendRelInfo; + } + } + return NULL; +} + + /* * AdjustReadIntermediateResultCost adjusts the row count and total cost * of a read_intermediate_result call based on the file size. diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 68216f9a3..5c7556eaa 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -83,7 +83,8 @@ typedef struct AttributeEquivalenceClassMember static bool ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext); -static Var * FindUnionAllVar(PlannerInfo *root, List *appendRelList, Oid relationOid, +static int RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo); +static Var * FindUnionAllVar(PlannerInfo *root, List *translatedVars, Oid relationOid, Index relationRteIndex, Index *partitionKeyIndex); static bool ContainsMultipleDistributedRelations(PlannerRestrictionContext * plannerRestrictionContext); @@ -156,9 +157,12 @@ static JoinRestrictionContext * FilterJoinRestrictionContext( static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int rangeTableArrayLength, Relids queryRteIdentities); -static int RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo); static Relids QueryRteIdentities(Query *queryTree); +#if PG_VERSION_NUM >= PG_VERSION_13 +static int ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *appendRelInfo); +#endif + /* * AllDistributionKeysInQueryAreEqual returns true if either * (i) there exists join in the query and all relations joined on their @@ -279,7 +283,8 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext */ if (appendRelList != NULL) { - varToBeAdded = FindUnionAllVar(relationPlannerRoot, appendRelList, + varToBeAdded = FindUnionAllVar(relationPlannerRoot, + relationRestriction->translatedVars, relationRestriction->relationId, relationRestriction->index, &partitionKeyIndex); @@ -373,63 +378,65 @@ SafeToPushdownUnionSubquery(PlannerRestrictionContext *plannerRestrictionContext } +/* + * RangeTableOffsetCompat returns the range table offset(in glob->finalrtable) for the appendRelInfo. + * For PG < 13 this is a no op. + */ +static int +RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo) +{ + #if PG_VERSION_NUM >= PG_VERSION_13 + int parentCount = ParentCountPriorToAppendRel(root->append_rel_list, appendRelInfo); + int skipParentCount = parentCount - 1; + + int i = 1; + for (; i < root->simple_rel_array_size; i++) + { + RangeTblEntry *rte = root->simple_rte_array[i]; + if (rte->inh) + { + /* + * We skip the previous parents because we want to find the offset + * for the given append rel info. + */ + if (skipParentCount > 0) + { + skipParentCount--; + continue; + } + break; + } + } + int indexInRtable = (i - 1); + + /* + * Postgres adds the global rte array size to parent_relid as an offset. + * Here we do the reverse operation: Commit on postgres side: + * 6ef77cf46e81f45716ec981cb08781d426181378 + */ + int parentRelIndex = appendRelInfo->parent_relid - 1; + return parentRelIndex - indexInRtable; + #else + return 0; + #endif +} + + /* * FindUnionAllVar finds the variable used in union all for the side that has * relationRteIndex as its index and the same varattno as the partition key of * the given relation with relationOid. */ static Var * -FindUnionAllVar(PlannerInfo *root, List *appendRelList, Oid relationOid, +FindUnionAllVar(PlannerInfo *root, List *translatedVars, Oid relationOid, Index relationRteIndex, Index *partitionKeyIndex) { - ListCell *appendRelCell = NULL; - AppendRelInfo *targetAppendRelInfo = NULL; - AttrNumber childAttrNumber = 0; - - *partitionKeyIndex = 0; - - /* iterate on the queries that are part of UNION ALL subselects */ - foreach(appendRelCell, appendRelList) - { - AppendRelInfo *appendRelInfo = (AppendRelInfo *) lfirst(appendRelCell); - - - int rtoffset = RangeTableOffsetCompat(root, appendRelInfo); - - /* - * We're only interested in the child rel that is equal to the - * relation we're investigating. - */ - if (appendRelInfo->child_relid - rtoffset == relationRteIndex) - { - targetAppendRelInfo = appendRelInfo; - break; - } - } - - if (!targetAppendRelInfo) - { - return NULL; - } - Var *relationPartitionKey = DistPartitionKeyOrError(relationOid); - #if PG_VERSION_NUM >= PG_VERSION_13 - for (; childAttrNumber < targetAppendRelInfo->num_child_cols; childAttrNumber++) - { - int curAttNo = targetAppendRelInfo->parent_colnos[childAttrNumber]; - if (curAttNo == relationPartitionKey->varattno) - { - *partitionKeyIndex = (childAttrNumber + 1); - int rtoffset = RangeTableOffsetCompat(root, targetAppendRelInfo); - relationPartitionKey->varno = targetAppendRelInfo->child_relid - rtoffset; - return relationPartitionKey; - } - } - #else + AttrNumber childAttrNumber = 0; + *partitionKeyIndex = 0; ListCell *translatedVarCell; - List *translaterVars = targetAppendRelInfo->translated_vars; - foreach(translatedVarCell, translaterVars) + foreach(translatedVarCell, translatedVars) { Node *targetNode = (Node *) lfirst(translatedVarCell); @@ -449,7 +456,6 @@ FindUnionAllVar(PlannerInfo *root, List *appendRelList, Oid relationOid, return targetVar; } } - #endif return NULL; } @@ -1387,31 +1393,32 @@ AddUnionAllSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass ** } +#if PG_VERSION_NUM >= PG_VERSION_13 + /* - * RangeTableOffsetCompat returns the range table offset(in glob->finalrtable) for the appendRelInfo. - * For PG < 13 this is a no op. + * ParentCountPriorToAppendRel returns the number of parents that come before + * the given append rel info. */ static int -RangeTableOffsetCompat(PlannerInfo *root, AppendRelInfo *appendRelInfo) +ParentCountPriorToAppendRel(List *appendRelList, AppendRelInfo *targetAppendRelInfo) { - #if PG_VERSION_NUM >= PG_VERSION_13 - int i = 1; - for (; i < root->simple_rel_array_size; i++) + int targetParentIndex = targetAppendRelInfo->parent_relid; + Bitmapset *parent_ids = NULL; + AppendRelInfo *appendRelInfo = NULL; + foreach_ptr(appendRelInfo, appendRelList) { - RangeTblEntry *rte = root->simple_rte_array[i]; - if (rte->inh) + int curParentIndex = appendRelInfo->parent_relid; + if (curParentIndex <= targetParentIndex) { - break; + parent_ids = bms_add_member(parent_ids, curParentIndex); } } - int indexInRtable = (i - 1); - return appendRelInfo->parent_relid - 1 - (indexInRtable); - #else - return 0; - #endif + return bms_num_members(parent_ids); } +#endif + /* * AddUnionSetOperationsToAttributeEquivalenceClass recursively iterates on all the * setOperations and adds each corresponding target entry to the given equivalence diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 2a0433e07..34b9d5b0f 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -67,6 +67,9 @@ typedef struct RelationRestriction /* list of RootPlanParams for all outer nodes */ List *outerPlanParamsList; + + /* list of translated vars, this is copied from postgres since it gets deleted on postgres*/ + List *translatedVars; } RelationRestriction; typedef struct JoinRestrictionContext diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index bfa650c0e..e1ece87e6 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -33,7 +33,6 @@ extern List * GenerateAllAttributeEquivalences(PlannerRestrictionContext * plannerRestrictionContext); extern uint32 UniqueRelationCount(RelationRestrictionContext *restrictionContext, CitusTableType tableType); - extern List * DistributedRelationIdList(Query *query); extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery( PlannerRestrictionContext *plannerRestrictionContext, diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index a9ef78208..fb8b9e1fa 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -191,8 +191,8 @@ s/relation with OID [0-9]+ does not exist/relation with OID XXXX does not exist/ # ignore JIT related messages /^DEBUG: probing availability of JIT.*/d /^DEBUG: provider not available, disabling JIT for current session.*/d - - +/^DEBUG: time to inline:.*/d +/^DEBUG: successfully loaded JIT.*/d # ignore timing statistics for VACUUM VERBOSE /CPU: user: .*s, system: .*s, elapsed: .*s/d diff --git a/src/test/regress/expected/set_operations.out b/src/test/regress/expected/set_operations.out index 005f92a85..472e5d05c 100644 --- a/src/test/regress/expected/set_operations.out +++ b/src/test/regress/expected/set_operations.out @@ -829,6 +829,57 @@ DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries ERROR: cannot compute aggregate (distinct) DETAIL: table partitioning is unsuitable for aggregate (distinct) +/* these are not safe to push down as the partition key index is different */ +SELECT COUNT(*) FROM ((SELECT x,y FROM test) UNION ALL (SELECT y,x FROM test)) u; +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan XXX_1 for subquery SELECT x, y FROM recursive_union.test +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: generating subplan XXX_2 for subquery SELECT y, x FROM recursive_union.test +DEBUG: Creating router plan +DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) UNION ALL SELECT intermediate_result.y, intermediate_result.x FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(y integer, x integer) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) u +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 4 +(1 row) + +/* this is safe to push down since the partition key index is the same */ +SELECT COUNT(*) FROM (SELECT x,y FROM test UNION ALL SELECT x,y FROM test) foo; +DEBUG: Router planner cannot handle multi-shard select queries + count +--------------------------------------------------------------------- + 4 +(1 row) + +SELECT COUNT(*) FROM + ((SELECT x,y FROM test UNION ALL SELECT x,y FROM test) + UNION ALL + (SELECT x,y FROM test UNION ALL SELECT x,y FROM test)) foo; +DEBUG: Router planner cannot handle multi-shard select queries + count +--------------------------------------------------------------------- + 8 +(1 row) + +SELECT COUNT(*) +FROM + (SELECT user_id AS user_id + FROM + (SELECT x AS user_id + FROM test + UNION ALL SELECT x AS user_id + FROM test) AS bar + UNION ALL SELECT x AS user_id + FROM test) AS fool LIMIT 1; +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: push down of limit count: 1 + count +--------------------------------------------------------------------- + 6 +(1 row) + -- one of the leaves is a repartition join SET citus.enable_repartition_joins TO ON; -- repartition is recursively planned before the set operation diff --git a/src/test/regress/expected/union_pushdown.out b/src/test/regress/expected/union_pushdown.out new file mode 100644 index 000000000..b63aa93ee --- /dev/null +++ b/src/test/regress/expected/union_pushdown.out @@ -0,0 +1,293 @@ +CREATE SCHEMA union_pushdown; +SET search_path TO union_pushdown; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1); +CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (1); +CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (1) TO (2); +CREATE TABLE users_table_part_2 PARTITION OF users_table_part FOR VALUES FROM (2) TO (3); +CREATE TABLE users_table_part_3 PARTITION OF users_table_part FOR VALUES FROM (3) TO (4); +CREATE TABLE users_table_part_4 PARTITION OF users_table_part FOR VALUES FROM (4) TO (5); +CREATE TABLE users_table_part_5 PARTITION OF users_table_part FOR VALUES FROM (5) TO (6); +CREATE TABLE users_table_part_6 PARTITION OF users_table_part FOR VALUES FROM (6) TO (7); +CREATE TABLE users_table_part_7 PARTITION OF users_table_part FOR VALUES FROM (7) TO (8); +CREATE TABLE users_table_part_8 PARTITION OF users_table_part FOR VALUES FROM (8) TO (9); +SELECT create_distributed_table('users_table_part', 'user_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO users_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i; +CREATE TABLE events_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1); +CREATE TABLE events_table_part_0 PARTITION OF events_table_part FOR VALUES FROM (0) TO (1); +CREATE TABLE events_table_part_1 PARTITION OF events_table_part FOR VALUES FROM (1) TO (2); +CREATE TABLE events_table_part_2 PARTITION OF events_table_part FOR VALUES FROM (2) TO (3); +CREATE TABLE events_table_part_3 PARTITION OF events_table_part FOR VALUES FROM (3) TO (4); +CREATE TABLE events_table_part_4 PARTITION OF events_table_part FOR VALUES FROM (4) TO (5); +CREATE TABLE events_table_part_5 PARTITION OF events_table_part FOR VALUES FROM (5) TO (6); +CREATE TABLE events_table_part_6 PARTITION OF events_table_part FOR VALUES FROM (6) TO (7); +CREATE TABLE events_table_part_7 PARTITION OF events_table_part FOR VALUES FROM (7) TO (8); +CREATE TABLE events_table_part_8 PARTITION OF events_table_part FOR VALUES FROM (8) TO (9); +SELECT create_distributed_table('events_table_part', 'user_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO events_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i; +set client_min_messages to DEBUG1; +-- a union all query with 2 different levels of UNION ALL +SELECT COUNT(*) +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id + FROM users_table_part + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS bar + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS fool LIMIT 1; +DEBUG: push down of limit count: 1 + count +--------------------------------------------------------------------- + 303 +(1 row) + +-- a union [all] query with 2 different levels of UNION [ALL] +SELECT COUNT(*) +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id + FROM users_table_part + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS bar + UNION SELECT user_id AS user_id + FROM users_table_part) AS fool LIMIT 1; +DEBUG: push down of limit count: 1 + count +--------------------------------------------------------------------- + 101 +(1 row) + +-- a union all query with several levels and leaf queries +SELECT DISTINCT user_id +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3 + UNION ALL + SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar + UNION ALL + (SELECT user_id AS user_id + FROM + (SELECT DISTINCT user_id AS user_id FROM users_table_part WHERE value_1 = 6 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar +ORDER BY 1 LIMIT 1; +DEBUG: push down of limit count: 1 + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +-- a union all query with several levels and leaf queries +-- on the partition tables +SELECT DISTINCT user_id +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1 + UNION ALL SELECT user_id AS user_id FROM users_table_part_2 WHERE value_1 = 2) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3 + UNION ALL + SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4 + UNION ALL SELECT user_id AS user_id FROM users_table_part_3 WHERE value_1 = 5) AS bar + UNION ALL + (SELECT user_id AS user_id + FROM + (SELECT DISTINCT user_id AS user_id FROM users_table_part WHERE value_1 = 6 + UNION ALL SELECT user_id AS user_id FROM users_table_part_5 WHERE value_1 = 7) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part_4 WHERE value_1 = 8)) AS bar +ORDER BY 1 LIMIT 1; +DEBUG: push down of limit count: 1 + user_id +--------------------------------------------------------------------- + 1 +(1 row) + +-- a union all query with a combine query on the coordinator +-- can still be pushed down +SELECT COUNT(DISTINCT user_id) +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3 + UNION ALL + SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar + UNION ALL + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 6 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar; + count +--------------------------------------------------------------------- + 89 +(1 row) + +-- a union all query with ORDER BY LIMIT +SELECT COUNT(user_id) +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3 + UNION ALL + SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar + UNION ALL + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 6 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar +ORDER BY 1 DESC LIMIT 10; +DEBUG: push down of limit count: 10 + count +--------------------------------------------------------------------- + 89 +(1 row) + +-- a union all query where leaf queries have JOINs on distribution keys +-- can be pushded down +SELECT COUNT(user_id) +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 1 + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 2) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 3 + UNION ALL + SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 4 + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 5) AS bar + UNION ALL + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 6 + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 7) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar +ORDER BY 1 DESC LIMIT 10; +DEBUG: push down of limit count: 10 + count +--------------------------------------------------------------------- + 89 +(1 row) + +-- a union all query deep down inside a subquery can still be pushed down +SELECT COUNT(user_id) FROM ( +SELECT user_id, random() FROM ( +SELECT user_id, random() FROM ( +SELECT user_id, random() +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 1 + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 2) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 3 + UNION ALL + SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 4 + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 5) AS bar + UNION ALL + (SELECT user_id AS user_id + FROM + (SELECT DISTINCT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 6 + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 7 AND events_table_part.user_id IN (SELECT user_id FROM users_table_part WHERE users_table_part.value_2 = 3 AND events_table_part.user_id IN (SELECT user_id FROM users_table_part WHERE value_2 = 3))) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar + WHERE user_id < 2000 ) as level_1 ) as level_2 ) as level_3 +ORDER BY 1 DESC LIMIT 10; +DEBUG: push down of limit count: 10 + count +--------------------------------------------------------------------- + 78 +(1 row) + +-- safe to pushdown +SELECT DISTINCT user_id FROM ( + SELECT * FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo + JOIN + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar + USING (user_id) + UNION ALL + SELECT * FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo + JOIN + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar + USING (user_id) +) as foo1 ORDER BY 1 LIMIT 1; +DEBUG: push down of limit count: 1 + user_id +--------------------------------------------------------------------- + 0 +(1 row) + +-- safe to pushdown +SELECT DISTINCT user_id FROM ( + SELECT * FROM ( + SELECT * FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo + JOIN + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar + USING (user_id) + UNION ALL + SELECT * FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo + JOIN + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar + USING (user_id)) as bar +) as foo1 ORDER BY 1 LIMIT 1; +DEBUG: push down of limit count: 1 + user_id +--------------------------------------------------------------------- + 0 +(1 row) + +-- safe to pushdown +SELECT DISTINCT user_id FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo + JOIN + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar +USING (user_id) +ORDER BY 1 LIMIT 1; +DEBUG: push down of limit count: 1 + user_id +--------------------------------------------------------------------- + 0 +(1 row) + +RESET client_min_messages; +DROP SCHEMA union_pushdown CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table users_table_part +drop cascades to table events_table_part diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 2a86c1efd..d967b4a5e 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -72,7 +72,7 @@ test: multi_create_fdw # ---------- # Tests for recursive subquery planning # ---------- -# NOTE: The next 6 were in parallel originally, but we got "too many +# NOTE: The next 7 were in parallel originally, but we got "too many # connection" errors on CI. Requires investigation before doing them in # parallel again. test: subquery_basics @@ -80,6 +80,7 @@ test: subquery_local_tables test: subquery_executors test: subquery_and_cte test: set_operations +test: union_pushdown test: set_operation_and_local_tables test: subqueries_deep subquery_view subquery_partitioning subqueries_not_supported diff --git a/src/test/regress/sql/set_operations.sql b/src/test/regress/sql/set_operations.sql index be9554835..9f66f4dfb 100644 --- a/src/test/regress/sql/set_operations.sql +++ b/src/test/regress/sql/set_operations.sql @@ -74,6 +74,7 @@ SELECT * FROM ((SELECT x, y FROM test) EXCEPT (SELECT y, x FROM test)) u ORDER B SELECT * FROM ((SELECT * FROM test) EXCEPT (SELECT * FROM ref)) u ORDER BY 1,2; SELECT * FROM ((SELECT * FROM ref) EXCEPT (SELECT * FROM ref)) u ORDER BY 1,2; + -- unions can even be pushed down within a join SELECT * FROM ((SELECT * FROM test) UNION (SELECT * FROM test)) u JOIN test USING (x) ORDER BY 1,2; SELECT * FROM ((SELECT * FROM test) UNION ALL (SELECT * FROM test)) u LEFT JOIN test USING (x) ORDER BY 1,2; @@ -148,6 +149,27 @@ select avg(DISTINCT t.x) FROM ((SELECT avg(DISTINCT y) FROM test GROUP BY x) UNI -- other agg. distincts are not supported when group by doesn't include partition key select count(DISTINCT t.x) FROM ((SELECT avg(DISTINCT y) FROM test GROUP BY y) UNION (SELECT avg(DISTINCT y) FROM test GROUP BY y)) as t(x) ORDER BY 1; +/* these are not safe to push down as the partition key index is different */ +SELECT COUNT(*) FROM ((SELECT x,y FROM test) UNION ALL (SELECT y,x FROM test)) u; + +/* this is safe to push down since the partition key index is the same */ +SELECT COUNT(*) FROM (SELECT x,y FROM test UNION ALL SELECT x,y FROM test) foo; +SELECT COUNT(*) FROM + ((SELECT x,y FROM test UNION ALL SELECT x,y FROM test) + UNION ALL + (SELECT x,y FROM test UNION ALL SELECT x,y FROM test)) foo; + +SELECT COUNT(*) +FROM + (SELECT user_id AS user_id + FROM + (SELECT x AS user_id + FROM test + UNION ALL SELECT x AS user_id + FROM test) AS bar + UNION ALL SELECT x AS user_id + FROM test) AS fool LIMIT 1; + -- one of the leaves is a repartition join SET citus.enable_repartition_joins TO ON; diff --git a/src/test/regress/sql/union_pushdown.sql b/src/test/regress/sql/union_pushdown.sql new file mode 100644 index 000000000..c020e2cc4 --- /dev/null +++ b/src/test/regress/sql/union_pushdown.sql @@ -0,0 +1,234 @@ +CREATE SCHEMA union_pushdown; +SET search_path TO union_pushdown; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1); +CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (1); +CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (1) TO (2); +CREATE TABLE users_table_part_2 PARTITION OF users_table_part FOR VALUES FROM (2) TO (3); +CREATE TABLE users_table_part_3 PARTITION OF users_table_part FOR VALUES FROM (3) TO (4); +CREATE TABLE users_table_part_4 PARTITION OF users_table_part FOR VALUES FROM (4) TO (5); +CREATE TABLE users_table_part_5 PARTITION OF users_table_part FOR VALUES FROM (5) TO (6); +CREATE TABLE users_table_part_6 PARTITION OF users_table_part FOR VALUES FROM (6) TO (7); +CREATE TABLE users_table_part_7 PARTITION OF users_table_part FOR VALUES FROM (7) TO (8); +CREATE TABLE users_table_part_8 PARTITION OF users_table_part FOR VALUES FROM (8) TO (9); +SELECT create_distributed_table('users_table_part', 'user_id'); +INSERT INTO users_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i; + + +CREATE TABLE events_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1); +CREATE TABLE events_table_part_0 PARTITION OF events_table_part FOR VALUES FROM (0) TO (1); +CREATE TABLE events_table_part_1 PARTITION OF events_table_part FOR VALUES FROM (1) TO (2); +CREATE TABLE events_table_part_2 PARTITION OF events_table_part FOR VALUES FROM (2) TO (3); +CREATE TABLE events_table_part_3 PARTITION OF events_table_part FOR VALUES FROM (3) TO (4); +CREATE TABLE events_table_part_4 PARTITION OF events_table_part FOR VALUES FROM (4) TO (5); +CREATE TABLE events_table_part_5 PARTITION OF events_table_part FOR VALUES FROM (5) TO (6); +CREATE TABLE events_table_part_6 PARTITION OF events_table_part FOR VALUES FROM (6) TO (7); +CREATE TABLE events_table_part_7 PARTITION OF events_table_part FOR VALUES FROM (7) TO (8); +CREATE TABLE events_table_part_8 PARTITION OF events_table_part FOR VALUES FROM (8) TO (9); +SELECT create_distributed_table('events_table_part', 'user_id'); +INSERT INTO events_table_part SELECT i, i %9, i %50 FROM generate_series(0, 100) i; + + +set client_min_messages to DEBUG1; +-- a union all query with 2 different levels of UNION ALL +SELECT COUNT(*) +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id + FROM users_table_part + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS bar + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS fool LIMIT 1; + +-- a union [all] query with 2 different levels of UNION [ALL] +SELECT COUNT(*) +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id + FROM users_table_part + UNION ALL SELECT user_id AS user_id + FROM users_table_part) AS bar + UNION SELECT user_id AS user_id + FROM users_table_part) AS fool LIMIT 1; + +-- a union all query with several levels and leaf queries +SELECT DISTINCT user_id +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3 + UNION ALL + SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar + UNION ALL + (SELECT user_id AS user_id + FROM + (SELECT DISTINCT user_id AS user_id FROM users_table_part WHERE value_1 = 6 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar +ORDER BY 1 LIMIT 1; + +-- a union all query with several levels and leaf queries +-- on the partition tables +SELECT DISTINCT user_id +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1 + UNION ALL SELECT user_id AS user_id FROM users_table_part_2 WHERE value_1 = 2) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3 + UNION ALL + SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4 + UNION ALL SELECT user_id AS user_id FROM users_table_part_3 WHERE value_1 = 5) AS bar + UNION ALL + (SELECT user_id AS user_id + FROM + (SELECT DISTINCT user_id AS user_id FROM users_table_part WHERE value_1 = 6 + UNION ALL SELECT user_id AS user_id FROM users_table_part_5 WHERE value_1 = 7) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part_4 WHERE value_1 = 8)) AS bar +ORDER BY 1 LIMIT 1; + +-- a union all query with a combine query on the coordinator +-- can still be pushed down +SELECT COUNT(DISTINCT user_id) +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3 + UNION ALL + + SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar + UNION ALL + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 6 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar; + +-- a union all query with ORDER BY LIMIT +SELECT COUNT(user_id) +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 1 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 2) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 3 + UNION ALL + + SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 4 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 5) AS bar + UNION ALL + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 6 + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 7) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part WHERE value_1 = 8)) AS bar +ORDER BY 1 DESC LIMIT 10; + +-- a union all query where leaf queries have JOINs on distribution keys +-- can be pushded down +SELECT COUNT(user_id) +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 1 + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 2) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 3 + UNION ALL + + SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 4 + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 5) AS bar + UNION ALL + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 6 + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 7) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar +ORDER BY 1 DESC LIMIT 10; + +-- a union all query deep down inside a subquery can still be pushed down +SELECT COUNT(user_id) FROM ( +SELECT user_id, random() FROM ( +SELECT user_id, random() FROM ( +SELECT user_id, random() +FROM + (SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 1 + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 2) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 3 + UNION ALL + SELECT user_id AS user_id + FROM + (SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 4 + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 5) AS bar + UNION ALL + (SELECT user_id AS user_id + FROM + (SELECT DISTINCT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 6 + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 7 AND events_table_part.user_id IN (SELECT user_id FROM users_table_part WHERE users_table_part.value_2 = 3 AND events_table_part.user_id IN (SELECT user_id FROM users_table_part WHERE value_2 = 3))) AS bar + UNION ALL SELECT user_id AS user_id FROM users_table_part JOIN events_table_part USING (user_id) WHERE users_table_part.value_1 = 8 GROUP BY user_id)) AS bar + WHERE user_id < 2000 ) as level_1 ) as level_2 ) as level_3 +ORDER BY 1 DESC LIMIT 10; + +-- safe to pushdown +SELECT DISTINCT user_id FROM ( + SELECT * FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo + JOIN + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar + USING (user_id) + UNION ALL + SELECT * FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo + JOIN + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar + USING (user_id) +) as foo1 ORDER BY 1 LIMIT 1; + +-- safe to pushdown +SELECT DISTINCT user_id FROM ( + SELECT * FROM ( + SELECT * FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo + JOIN + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar + USING (user_id) + UNION ALL + SELECT * FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo + JOIN + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar + USING (user_id)) as bar +) as foo1 ORDER BY 1 LIMIT 1; + +-- safe to pushdown +SELECT DISTINCT user_id FROM + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as foo + JOIN + (SELECT user_id FROM users_table_part UNION ALL SELECT user_id FROM users_table_part) as bar +USING (user_id) +ORDER BY 1 LIMIT 1; + +RESET client_min_messages; +DROP SCHEMA union_pushdown CASCADE;