diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index c7cc50fb8..896163784 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -464,6 +464,13 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter } shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex); + + /* means it is a reference table and do not add any shard interval information */ + if (shardOpExpressions == NIL) + { + continue; + } + shardRestrictionList = make_simple_restrictinfo((Expr *) shardOpExpressions); extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo, shardRestrictionList); diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index 8641af7fd..a0ef2c739 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -997,6 +997,23 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) Expr *targetExpression = targetEntry->expr; bool isPartitionColumn = IsPartitionColumn(targetExpression, query); + Oid relationId = InvalidOid; + Var *column = NULL; + + FindReferencedTableColumn(targetExpression, NIL, query, &relationId, &column); + + /* + * If the expression belongs to reference table directly returns true, + * since logic of caller function checks whether it can find the necessaary + * data from each node. + */ + if (IsDistributedTable(relationId) && PartitionMethod(relationId) == + DISTRIBUTE_BY_NONE) + { + targetListOnPartitionColumn = true; + break; + } + if (isPartitionColumn) { FieldSelect *compositeField = CompositeFieldRecursive(targetExpression, @@ -2728,6 +2745,50 @@ ExtractRangeTableRelationWalker(Node *node, List **rangeTableRelationList) } +/* Get the list of relations from the given node. Note that the difference between + * this function and ExtractRangeTableRelationWalker is that this one recursively + * walk into range table entries if it can. + */ +bool +ExtractRTRelationFromNode(Node *node, List **rangeTableList) +{ + bool walkIsComplete = false; + if (node == NULL) + { + return false; + } + + if (IsA(node, RangeTblEntry)) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node; + + if (rangeTableEntry->rtekind == RTE_RELATION && + rangeTableEntry->relkind != RELKIND_VIEW) + { + (*rangeTableList) = lappend(*rangeTableList, rangeTableEntry); + } + else if (rangeTableEntry->rtekind == RTE_SUBQUERY) + { + walkIsComplete = query_tree_walker(rangeTableEntry->subquery, + ExtractRTRelationFromNode, + rangeTableList, QTW_EXAMINE_RTES); + } + } + else if (IsA(node, Query)) + { + walkIsComplete = query_tree_walker((Query *) node, ExtractRTRelationFromNode, + rangeTableList, QTW_EXAMINE_RTES); + } + else + { + walkIsComplete = expression_tree_walker(node, ExtractRTRelationFromNode, + rangeTableList); + } + + return walkIsComplete; +} + + /* * ExtractRangeTableEntryWalker walks over a query tree, and finds all range * table entries. For recursing into the query tree, this function uses the diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index fcee39f40..9e3fd4458 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -127,6 +127,8 @@ static Job * BuildJobTreeTaskList(Job *jobTree, static List * SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionContext); static void ErrorIfUnsupportedShardDistribution(Query *query); +static void ErrorIfUnsupportedJoinReferenceTable( + PlannerRestrictionContext *plannerRestrictionContext); static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction, ShardInterval *firstInterval, @@ -195,6 +197,7 @@ static StringInfo MergeTableQueryString(uint32 taskIdIndex, List *targetEntryLis static StringInfo IntermediateTableQueryString(uint64 jobId, uint32 taskIdIndex, Query *reduceQuery); static uint32 FinalTargetEntryCount(List *targetEntryList); +static bool ReferenceTableExist(PlannerInfo *plannerInfo, RelOptInfo *relationInfo); /* @@ -2038,6 +2041,9 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte /* error if shards are not co-partitioned */ ErrorIfUnsupportedShardDistribution(subquery); + /* error if unsupported join on reference tables */ + ErrorIfUnsupportedJoinReferenceTable(plannerRestrictionContext); + /* get list of all range tables in subquery tree */ ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList); @@ -2061,7 +2067,16 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte break; } - Assert(targetCacheEntry != NULL); + /* + * That means all table are reference table and we can assign any reference + * table as an anchor one . + */ + if (targetCacheEntry == NULL) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList); + relationId = rangeTableEntry->relid; + targetCacheEntry = DistributedTableCacheEntry(relationId); + } shardCount = targetCacheEntry->shardIntervalArrayLength; for (shardOffset = 0; shardOffset < shardCount; shardOffset++) @@ -2088,6 +2103,146 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte } +/* + * ErrorIfUnsupportedJoinReferenceTable errors out if there exists a outer join + * exist between reference table and distributed tables. + */ +static void +ErrorIfUnsupportedJoinReferenceTable(PlannerRestrictionContext *plannerRestrictionContext) +{ + List *joinRestrictionList = + plannerRestrictionContext->joinRestrictionContext->joinRestrictionList; + ListCell *joinRestrictionCell = NULL; + + foreach(joinRestrictionCell, joinRestrictionList) + { + JoinRestriction *joinRestriction = (JoinRestriction *) lfirst( + joinRestrictionCell); + JoinType joinType = joinRestriction->joinType; + PlannerInfo *plannerInfo = joinRestriction->plannerInfo; + RelOptInfo *innerrel = joinRestriction->innerrel; + RelOptInfo *outerrel = joinRestriction->outerrel; + + switch (joinType) + { + case JOIN_SEMI: + { + if (ReferenceTableExist(plannerInfo, outerrel)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("can not plan query having reference table on" + " the left part of semi join"))); + } + } + break; + + case JOIN_ANTI: + { + if (ReferenceTableExist(plannerInfo, innerrel)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("can not plan query having reference table on" + " the left part of anti join"))); + } + } + break; + + case JOIN_LEFT: + { + if (ReferenceTableExist(plannerInfo, outerrel)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("can not plan query having reference table on" + " the left part of left join"))); + } + } + break; + + case JOIN_RIGHT: + { + if (ReferenceTableExist(plannerInfo, innerrel)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("can not plan query having reference table on" + " the right part of right join"))); + } + } + break; + + case JOIN_FULL: + { + if (ReferenceTableExist(plannerInfo, innerrel) || ReferenceTableExist( + plannerInfo, outerrel)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "can not plan query having reference table as a" + " part of full join"))); + } + } + break; + + default: + { } + break; + } + } +} + + +/* + * ReferenceTableExist check whether the relationInfo has reference table. + * Since relation ids of relationInfo indexes to the range table entry list of + * query, query is also passed. + */ +static bool +ReferenceTableExist(PlannerInfo *plannerInfo, RelOptInfo *relationInfo) +{ + Relids relids = bms_copy(relationInfo->relids); + int relationId = -1; + + while ((relationId = bms_first_member(relids)) >= 0) + { + RangeTblEntry *rangeTableEntry = plannerInfo->simple_rte_array[relationId]; + + /* relationInfo has this range table entry */ + if (RTEContainsReferenceTable(rangeTableEntry)) + { + return true; + } + } + + return false; +} + + +/* + * RTEContainsReferenceTable checks whether there exist a reference table in the + * given range table entry. + */ +bool +RTEContainsReferenceTable(RangeTblEntry *rangeTableEntry) +{ + List *relationList = NIL; + ListCell *relationCell = NULL; + ExtractRTRelationFromNode((Node *) rangeTableEntry, &relationList); + + foreach(relationCell, relationList) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(relationCell); + Oid relationId = rangeTableEntry->relid; + + if (IsDistributedTable(relationId) && PartitionMethod(relationId) == + DISTRIBUTE_BY_NONE) + { + return true; + } + } + + return false; +} + + /* * ErrorIfUnsupportedShardDistribution gets list of relations in the given query * and checks if two conditions below hold for them, otherwise it errors out. @@ -2121,10 +2276,8 @@ ErrorIfUnsupportedShardDistribution(Query *query) } else { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot push down this subquery"), - errdetail("Currently range and hash partitioned " - "relations are supported"))); + /* do not need to handle reference tables */ + continue; } } @@ -2141,6 +2294,13 @@ ErrorIfUnsupportedShardDistribution(Query *query) Oid relationId = lfirst_oid(relationIdCell); bool coPartitionedTables = false; Oid currentRelationId = relationId; + char partitionMethod = PartitionMethod(relationId); + + /* do not need to check reference tables */ + if (partitionMethod == DISTRIBUTE_BY_NONE) + { + continue; + } /* get shard list of first relation and continue for the next relation */ if (relationIndex == 0) @@ -2298,9 +2458,6 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, RestrictInfo *shardRestrictionList = NULL; DeferredErrorMessage *planningError = NULL; - /* such queries should go through router planner */ - Assert(!restrictionContext->allReferenceTables); - /* * Add the restriction qual parameter value in all baserestrictinfos. * Note that this has to be done on a copy, as the originals are needed @@ -2315,6 +2472,12 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval, shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex); + /* means it is a reference table and do not add any shard interval info */ + if (shardOpExpressions == NIL) + { + continue; + } + shardRestrictionList = make_simple_restrictinfo((Expr *) shardOpExpressions); extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo, shardRestrictionList); diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 74af951f2..5de8d5bd1 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -740,6 +740,8 @@ multi_join_restriction_hook(PlannerInfo *root, joinRestriction->joinType = jointype; joinRestriction->joinRestrictInfoList = restrictInfoList; joinRestriction->plannerInfo = root; + joinRestriction->innerrel = innerrel; + joinRestriction->outerrel = outerrel; joinRestrictionContext->joinRestrictionList = lappend(joinRestrictionContext->joinRestrictionList, joinRestriction); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index ac71a5f29..7734ae32c 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -300,10 +300,8 @@ ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex) } else { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot create shard interval operator expression for " - "distributed relations other than hash, range and append distributed " - "relations"))); + /* do not add any shard range interval for reference tables */ + return NIL; } /* build the base expression for constraint */ diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 594f2ce11..f73deb79b 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -206,6 +206,7 @@ extern List * TableEntryList(List *rangeTableList); extern List * UsedTableEntryList(Query *query); extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList); extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList); +extern bool ExtractRTRelationFromNode(Node *node, List **rangeTableList); extern List * pull_var_clause_default(Node *node); extern bool OperatorImplementsEquality(Oid opno); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index bbff3a83e..5f2f08f1d 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -286,6 +286,7 @@ extern Const * MakeInt4Constant(Datum constantValue); extern int CompareShardPlacements(const void *leftElement, const void *rightElement); extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval); +extern bool RTEContainsReferenceTable(RangeTblEntry *rangeTableEntry); /* function declarations for Task and Task list operations */ extern bool TasksEqual(const Task *a, const Task *b); diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h index 952f2fa53..25c2ee4c5 100644 --- a/src/include/distributed/multi_planner.h +++ b/src/include/distributed/multi_planner.h @@ -52,6 +52,8 @@ typedef struct JoinRestriction JoinType joinType; List *joinRestrictInfoList; PlannerInfo *plannerInfo; + RelOptInfo *innerrel; + RelOptInfo *outerrel; } JoinRestriction; typedef struct PlannerRestrictionContext diff --git a/src/test/regress/expected/multi_subquery_complex_reference_clause.out b/src/test/regress/expected/multi_subquery_complex_reference_clause.out new file mode 100644 index 000000000..6a28a0226 --- /dev/null +++ b/src/test/regress/expected/multi_subquery_complex_reference_clause.out @@ -0,0 +1,376 @@ +-- +-- multi subquery complex queries aims to expand existing subquery pushdown +-- regression tests to cover more caeses +-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql +-- +-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests +-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000; + +SET citus.enable_router_execution TO FALSE; +CREATE TABLE events_reference_table as SELECT * FROM events_table; +CREATE TABLE users_reference_table as SELECT * FROM users_table; +SELECT create_reference_table('events_reference_table'); +NOTICE: Copying data from local table... + create_reference_table +------------------------ + +(1 row) + +SELECT create_reference_table('users_reference_table'); +NOTICE: Copying data from local table... + create_reference_table +------------------------ + +(1 row) + +-- LATERAL JOINs used with INNER JOINs with reference tables +SET citus.subquery_pushdown to ON; +SELECT user_id, lastseen +FROM + (SELECT + "some_users_data".user_id, lastseen + FROM + (SELECT + filter_users_1.user_id, time AS lastseen + FROM + (SELECT + user_where_1_1.user_id + FROM + (SELECT + "users"."user_id" + FROM + users_reference_table as "users" + WHERE + user_id > 12 and user_id < 16 and value_1 > 20) user_where_1_1 + INNER JOIN + (SELECT + "users"."user_id" + FROM + users_reference_table as "users" + WHERE + user_id > 12 and user_id < 16 and value_2 > 60) user_where_1_join_1 + ON ("user_where_1_1".user_id = "user_where_1_join_1".user_id)) + filter_users_1 + JOIN LATERAL + (SELECT + user_id, time + FROM + events_reference_table as "events" + WHERE + user_id > 12 and user_id < 16 AND + user_id = filter_users_1.user_id + ORDER BY + time DESC + LIMIT 1) "last_events_1" + ON TRUE + ORDER BY + time DESC + LIMIT 10) "some_recent_users" + JOIN LATERAL + (SELECT + "users".user_id + FROM + users_reference_table as "users" + WHERE + "users"."user_id" = "some_recent_users"."user_id" AND + "users"."value_2" > 70 + LIMIT 1) "some_users_data" + ON TRUE + ORDER BY + lastseen DESC + LIMIT 10) "some_users" +ORDER BY + user_id DESC +LIMIT 10; + user_id | lastseen +---------+--------------------------------- + 14 | Tue Jan 21 05:46:51.286381 2014 + 14 | Tue Jan 21 05:46:51.286381 2014 + 14 | Tue Jan 21 05:46:51.286381 2014 + 14 | Tue Jan 21 05:46:51.286381 2014 + 14 | Tue Jan 21 05:46:51.286381 2014 + 14 | Tue Jan 21 05:46:51.286381 2014 + 14 | Tue Jan 21 05:46:51.286381 2014 + 14 | Tue Jan 21 05:46:51.286381 2014 + 14 | Tue Jan 21 05:46:51.286381 2014 + 14 | Tue Jan 21 05:46:51.286381 2014 +(10 rows) + +SET citus.subquery_pushdown to OFF; +-- NESTED INNER JOINs with reference tables +SELECT + count(*) AS value, "generated_group_field" + FROM + (SELECT + DISTINCT "pushedDownQuery"."real_user_id", "generated_group_field" + FROM + (SELECT + "eventQuery"."real_user_id", "eventQuery"."time", random(), ("eventQuery"."value_2") AS "generated_group_field" + FROM + (SELECT + * + FROM + (SELECT + "events"."time", "events"."user_id", "events"."value_2" + FROM + events_reference_table as "events" + WHERE + user_id > 10 and user_id < 40 AND event_type IN (40, 41, 42, 43, 44, 45) ) "temp_data_queries" + INNER JOIN + (SELECT + user_where_1_1.real_user_id + FROM + (SELECT + "users"."user_id" as real_user_id + FROM + users_reference_table as "users" + WHERE + user_id > 10 and user_id < 40 and value_2 > 50 ) user_where_1_1 + INNER JOIN + (SELECT + "users"."user_id" + FROM + users_reference_table as "users" + WHERE + user_id > 10 and user_id < 40 and value_3 > 50 ) user_where_1_join_1 + ON ("user_where_1_1".real_user_id = "user_where_1_join_1".user_id)) "user_filters_1" + ON ("temp_data_queries".user_id = "user_filters_1".real_user_id)) "eventQuery") "pushedDownQuery") "pushedDownQuery" +GROUP BY + "generated_group_field" +ORDER BY + generated_group_field DESC, value DESC; + value | generated_group_field +-------+----------------------- + 1 | 966 + 1 | 917 + 1 | 905 + 1 | 868 + 1 | 836 + 1 | 791 + 1 | 671 + 1 | 642 + 1 | 358 + 1 | 317 + 1 | 307 + 1 | 302 + 1 | 214 + 1 | 166 + 1 | 116 + 1 | 1 +(16 rows) + +-- single level inner joins with reference tables +SELECT + "value_3", count(*) AS cnt +FROM + (SELECT + "value_3", "user_id", random() + FROM + (SELECT + users_in_segment_1.user_id, value_3 + FROM + (SELECT + user_id, value_3 * 2 as value_3 + FROM + (SELECT + user_id, value_3 + FROM + (SELECT + "users"."user_id", value_3 + FROM + users_reference_table as "users" + WHERE + user_id > 10 and user_id < 40 and value_2 > 30 + ) simple_user_where_1 + ) all_buckets_1 + ) users_in_segment_1 + JOIN + (SELECT + "users"."user_id" + FROM + users_reference_table as "users" + WHERE + user_id > 10 and user_id < 40 and value_2 > 60 + ) some_users_data + ON ("users_in_segment_1".user_id = "some_users_data".user_id) + ) segmentalias_1) "tempQuery" +GROUP BY "value_3" +ORDER BY cnt, value_3 DESC LIMIT 10; + value_3 | cnt +---------+----- + 556 | 75 + 228 | 75 + 146 | 75 + 70 | 75 + 1442 | 79 + 1232 | 79 + 1090 | 79 + 1012 | 79 + 886 | 79 + 674 | 79 +(10 rows) + +-- nested LATERAL JOINs with reference tables +SET citus.subquery_pushdown to ON; +SELECT * +FROM + (SELECT "some_users_data".user_id, "some_recent_users".value_3 + FROM + (SELECT + filter_users_1.user_id, value_3 + FROM + (SELECT + "users"."user_id" + FROM + users_reference_table as "users" + WHERE + user_id > 20 and user_id < 70 and users.value_2 = 200) filter_users_1 + JOIN LATERAL + (SELECT + user_id, value_3 + FROM + events_reference_table as "events" + WHERE + user_id > 20 and user_id < 70 AND + ("events".user_id = "filter_users_1".user_id) + ORDER BY + value_3 DESC + LIMIT 1) "last_events_1" ON true + ORDER BY value_3 DESC + LIMIT 10) "some_recent_users" + JOIN LATERAL + (SELECT + "users".user_id + FROM + users_reference_table as "users" + WHERE + "users"."user_id" = "some_recent_users"."user_id" AND + users.value_2 > 200 + LIMIT 1) "some_users_data" ON true + ORDER BY + value_3 DESC +LIMIT 10) "some_users" +ORDER BY + value_3 DESC +LIMIT 10; + user_id | value_3 +---------+--------- + 44 | 998 + 65 | 996 + 66 | 996 + 37 | 995 + 57 | 989 + 21 | 985 +(6 rows) + +SET citus.subquery_pushdown to OFF; +-- LEFT JOINs used with INNER JOINs should error out since reference table exist in the +-- left side of the LEFT JOIN. +SELECT +count(*) AS cnt, "generated_group_field" + FROM + (SELECT + "eventQuery"."user_id", random(), generated_group_field + FROM + (SELECT + "multi_group_wrapper_1".*, generated_group_field, random() + FROM + (SELECT * + FROM + (SELECT + "events"."time", "events"."user_id" as event_user_id + FROM + events_table as "events" + WHERE + user_id > 80) "temp_data_queries" + INNER JOIN + (SELECT + "users"."user_id" + FROM + users_reference_table as "users" + WHERE + user_id > 80 and value_2 = 5) "user_filters_1" + ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1" + LEFT JOIN + (SELECT + "users"."user_id" AS "user_id", value_2 AS "generated_group_field" + FROM + users_table as "users") "left_group_by_1" + ON ("left_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery" + group BY + "generated_group_field" + ORDER BY + cnt DESC, generated_group_field ASC + LIMIT 10; +ERROR: can not plan query having reference table on the left part of left join + -- RIGHT JOINs used with INNER JOINs should error out since reference table exist in the +-- right side of the RIGHT JOIN. +SELECT +count(*) AS cnt, "generated_group_field" + FROM + (SELECT + "eventQuery"."user_id", random(), generated_group_field + FROM + (SELECT + "multi_group_wrapper_1".*, generated_group_field, random() + FROM + (SELECT * + FROM + (SELECT + "events"."time", "events"."user_id" as event_user_id + FROM + events_table as "events" + WHERE + user_id > 80) "temp_data_queries" + INNER JOIN + (SELECT + "users"."user_id" + FROM + users_table as "users" + WHERE + user_id > 80 and value_2 = 5) "user_filters_1" + ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1" + RIGHT JOIN + (SELECT + "users"."user_id" AS "user_id", value_2 AS "generated_group_field" + FROM + users_reference_table as "users") "right_group_by_1" + ON ("right_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery" + group BY + "generated_group_field" + ORDER BY + cnt DESC, generated_group_field ASC + LIMIT 10; +ERROR: can not plan query having reference table on the left part of left join + -- Outer subquery with reference table +SELECT "some_users_data".user_id, lastseen +FROM + (SELECT user_id, max(time) AS lastseen + FROM + (SELECT user_id, time + FROM + (SELECT + user_id, time + FROM + events_reference_table as "events" + WHERE + user_id > 10 and user_id < 40) "events_1" + ORDER BY + time DESC) "recent_events_1" + GROUP BY + user_id + ORDER BY + max(TIME) DESC) "some_recent_users" + FULL JOIN + (SELECT + "users".user_id + FROM + users_table as "users" + WHERE + users.value_2 > 50 and users.value_2 < 55) "some_users_data" + ON "some_users_data"."user_id" = "some_recent_users"."user_id" +ORDER BY + user_id +limit 50; +ERROR: can not plan query having reference table as a part of full join diff --git a/src/test/regress/expected/multi_subquery_in_where_reference_clause.out b/src/test/regress/expected/multi_subquery_in_where_reference_clause.out new file mode 100644 index 000000000..0159510b2 --- /dev/null +++ b/src/test/regress/expected/multi_subquery_in_where_reference_clause.out @@ -0,0 +1,144 @@ +-- +-- queries to test the subquery pushdown on reference tables +-- subqueries in WHERE with greater operator +SELECT + user_id +FROM + users_table +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_reference_table + WHERE + users_table.user_id = events_reference_table.user_id AND event_type = 50 + GROUP BY + user_id + ) +GROUP BY user_id +HAVING count(*) > 66 +ORDER BY user_id +LIMIT 5; + user_id +--------- + 49 + 55 + 56 + 63 +(4 rows) + +-- have reference table without any equality, should error out +SELECT + user_id +FROM + users_table +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_reference_table + WHERE + event_type = 50 + GROUP BY + user_id + ) +GROUP BY user_id +HAVING count(*) > 66 +ORDER BY user_id +LIMIT 5; +ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys +DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. +-- users that appeared more than 118 times, should run since the reference table +-- on the right side of the semi join +SELECT + user_id +FROM + users_table +WHERE 118 <= + (SELECT + count(*) + FROM + events_reference_table + WHERE + users_table.user_id = events_reference_table.user_id + GROUP BY + user_id) +GROUP BY + user_id +ORDER BY + user_id; + user_id +--------- + 13 + 17 + 23 + 25 +(4 rows) + +-- should error out since reference table exist on the left side +-- of the left lateral join +SELECT user_id, value_2 FROM users_table WHERE + value_1 > 101 AND value_1 < 110 + AND value_2 >= 5 + AND user_id IN + ( + SELECT + e1.user_id + FROM ( + -- Get the first time each user viewed the homepage. + SELECT + user_id, + 1 AS view_homepage, + min(time) AS view_homepage_time + FROM events_reference_table + WHERE + event_type IN (10, 20, 30, 40, 50, 60, 70, 80, 90) + GROUP BY user_id + ) e1 LEFT JOIN LATERAL ( + SELECT + user_id, + 1 AS use_demo, + time AS use_demo_time + FROM events_reference_table + WHERE + user_id = e1.user_id AND + event_type IN (11, 21, 31, 41, 51, 61, 71, 81, 91) + ORDER BY time + ) e2 ON true LEFT JOIN LATERAL ( + SELECT + user_id, + 1 AS enter_credit_card, + time AS enter_credit_card_time + FROM events_reference_table + WHERE + user_id = e2.user_id AND + event_type IN (12, 22, 32, 42, 52, 62, 72, 82, 92) + ORDER BY time + ) e3 ON true LEFT JOIN LATERAL ( + SELECT + 1 AS submit_card_info, + user_id, + time AS enter_credit_card_time + FROM events_reference_table + WHERE + user_id = e3.user_id AND + event_type IN (13, 23, 33, 43, 53, 63, 73, 83, 93) + ORDER BY time + ) e4 ON true LEFT JOIN LATERAL ( + SELECT + 1 AS see_bought_screen + FROM events_reference_table + WHERE + user_id = e4.user_id AND + event_type IN (14, 24, 34, 44, 54, 64, 74, 84, 94) + ORDER BY time + ) e5 ON true + group by e1.user_id + HAVING sum(submit_card_info) > 0 +) +ORDER BY 1, 2; +ERROR: can not plan query having reference table on the left part of left join +DROP TABLE events_reference_table; +DROP TABLE users_reference_table; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 9311b5d23..b860d0e71 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -51,6 +51,8 @@ test: multi_deparse_shard_query multi_distributed_transaction_id test: multi_basic_queries multi_complex_expressions test: multi_explain test: multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics +test: multi_subquery_complex_reference_clause +test: multi_subquery_in_where_reference_clause test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc test: multi_reference_table test: multi_outer_join_reference diff --git a/src/test/regress/sql/multi_subquery_complex_reference_clause.sql b/src/test/regress/sql/multi_subquery_complex_reference_clause.sql new file mode 100644 index 000000000..337b4f52d --- /dev/null +++ b/src/test/regress/sql/multi_subquery_complex_reference_clause.sql @@ -0,0 +1,313 @@ +-- +-- multi subquery complex queries aims to expand existing subquery pushdown +-- regression tests to cover more caeses +-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql +-- + +-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests +-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000; + +SET citus.enable_router_execution TO FALSE; + +CREATE TABLE events_reference_table as SELECT * FROM events_table; +CREATE TABLE users_reference_table as SELECT * FROM users_table; + +SELECT create_reference_table('events_reference_table'); +SELECT create_reference_table('users_reference_table'); + +-- LATERAL JOINs used with INNER JOINs with reference tables +SET citus.subquery_pushdown to ON; +SELECT user_id, lastseen +FROM + (SELECT + "some_users_data".user_id, lastseen + FROM + (SELECT + filter_users_1.user_id, time AS lastseen + FROM + (SELECT + user_where_1_1.user_id + FROM + (SELECT + "users"."user_id" + FROM + users_reference_table as "users" + WHERE + user_id > 12 and user_id < 16 and value_1 > 20) user_where_1_1 + INNER JOIN + (SELECT + "users"."user_id" + FROM + users_reference_table as "users" + WHERE + user_id > 12 and user_id < 16 and value_2 > 60) user_where_1_join_1 + ON ("user_where_1_1".user_id = "user_where_1_join_1".user_id)) + filter_users_1 + JOIN LATERAL + (SELECT + user_id, time + FROM + events_reference_table as "events" + WHERE + user_id > 12 and user_id < 16 AND + user_id = filter_users_1.user_id + ORDER BY + time DESC + LIMIT 1) "last_events_1" + ON TRUE + ORDER BY + time DESC + LIMIT 10) "some_recent_users" + JOIN LATERAL + (SELECT + "users".user_id + FROM + users_reference_table as "users" + WHERE + "users"."user_id" = "some_recent_users"."user_id" AND + "users"."value_2" > 70 + LIMIT 1) "some_users_data" + ON TRUE + ORDER BY + lastseen DESC + LIMIT 10) "some_users" +ORDER BY + user_id DESC +LIMIT 10; +SET citus.subquery_pushdown to OFF; + +-- NESTED INNER JOINs with reference tables +SELECT + count(*) AS value, "generated_group_field" + FROM + (SELECT + DISTINCT "pushedDownQuery"."real_user_id", "generated_group_field" + FROM + (SELECT + "eventQuery"."real_user_id", "eventQuery"."time", random(), ("eventQuery"."value_2") AS "generated_group_field" + FROM + (SELECT + * + FROM + (SELECT + "events"."time", "events"."user_id", "events"."value_2" + FROM + events_reference_table as "events" + WHERE + user_id > 10 and user_id < 40 AND event_type IN (40, 41, 42, 43, 44, 45) ) "temp_data_queries" + INNER JOIN + (SELECT + user_where_1_1.real_user_id + FROM + (SELECT + "users"."user_id" as real_user_id + FROM + users_reference_table as "users" + WHERE + user_id > 10 and user_id < 40 and value_2 > 50 ) user_where_1_1 + INNER JOIN + (SELECT + "users"."user_id" + FROM + users_reference_table as "users" + WHERE + user_id > 10 and user_id < 40 and value_3 > 50 ) user_where_1_join_1 + ON ("user_where_1_1".real_user_id = "user_where_1_join_1".user_id)) "user_filters_1" + ON ("temp_data_queries".user_id = "user_filters_1".real_user_id)) "eventQuery") "pushedDownQuery") "pushedDownQuery" +GROUP BY + "generated_group_field" +ORDER BY + generated_group_field DESC, value DESC; + +-- single level inner joins with reference tables +SELECT + "value_3", count(*) AS cnt +FROM + (SELECT + "value_3", "user_id", random() + FROM + (SELECT + users_in_segment_1.user_id, value_3 + FROM + (SELECT + user_id, value_3 * 2 as value_3 + FROM + (SELECT + user_id, value_3 + FROM + (SELECT + "users"."user_id", value_3 + FROM + users_reference_table as "users" + WHERE + user_id > 10 and user_id < 40 and value_2 > 30 + ) simple_user_where_1 + ) all_buckets_1 + ) users_in_segment_1 + JOIN + (SELECT + "users"."user_id" + FROM + users_reference_table as "users" + WHERE + user_id > 10 and user_id < 40 and value_2 > 60 + ) some_users_data + ON ("users_in_segment_1".user_id = "some_users_data".user_id) + ) segmentalias_1) "tempQuery" +GROUP BY "value_3" +ORDER BY cnt, value_3 DESC LIMIT 10; + +-- nested LATERAL JOINs with reference tables +SET citus.subquery_pushdown to ON; +SELECT * +FROM + (SELECT "some_users_data".user_id, "some_recent_users".value_3 + FROM + (SELECT + filter_users_1.user_id, value_3 + FROM + (SELECT + "users"."user_id" + FROM + users_reference_table as "users" + WHERE + user_id > 20 and user_id < 70 and users.value_2 = 200) filter_users_1 + JOIN LATERAL + (SELECT + user_id, value_3 + FROM + events_reference_table as "events" + WHERE + user_id > 20 and user_id < 70 AND + ("events".user_id = "filter_users_1".user_id) + ORDER BY + value_3 DESC + LIMIT 1) "last_events_1" ON true + ORDER BY value_3 DESC + LIMIT 10) "some_recent_users" + JOIN LATERAL + (SELECT + "users".user_id + FROM + users_reference_table as "users" + WHERE + "users"."user_id" = "some_recent_users"."user_id" AND + users.value_2 > 200 + LIMIT 1) "some_users_data" ON true + ORDER BY + value_3 DESC +LIMIT 10) "some_users" +ORDER BY + value_3 DESC +LIMIT 10; +SET citus.subquery_pushdown to OFF; + +-- LEFT JOINs used with INNER JOINs should error out since reference table exist in the +-- left side of the LEFT JOIN. +SELECT +count(*) AS cnt, "generated_group_field" + FROM + (SELECT + "eventQuery"."user_id", random(), generated_group_field + FROM + (SELECT + "multi_group_wrapper_1".*, generated_group_field, random() + FROM + (SELECT * + FROM + (SELECT + "events"."time", "events"."user_id" as event_user_id + FROM + events_table as "events" + WHERE + user_id > 80) "temp_data_queries" + INNER JOIN + (SELECT + "users"."user_id" + FROM + users_reference_table as "users" + WHERE + user_id > 80 and value_2 = 5) "user_filters_1" + ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1" + LEFT JOIN + (SELECT + "users"."user_id" AS "user_id", value_2 AS "generated_group_field" + FROM + users_table as "users") "left_group_by_1" + ON ("left_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery" + group BY + "generated_group_field" + ORDER BY + cnt DESC, generated_group_field ASC + LIMIT 10; + + -- RIGHT JOINs used with INNER JOINs should error out since reference table exist in the +-- right side of the RIGHT JOIN. +SELECT +count(*) AS cnt, "generated_group_field" + FROM + (SELECT + "eventQuery"."user_id", random(), generated_group_field + FROM + (SELECT + "multi_group_wrapper_1".*, generated_group_field, random() + FROM + (SELECT * + FROM + (SELECT + "events"."time", "events"."user_id" as event_user_id + FROM + events_table as "events" + WHERE + user_id > 80) "temp_data_queries" + INNER JOIN + (SELECT + "users"."user_id" + FROM + users_table as "users" + WHERE + user_id > 80 and value_2 = 5) "user_filters_1" + ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1" + RIGHT JOIN + (SELECT + "users"."user_id" AS "user_id", value_2 AS "generated_group_field" + FROM + users_reference_table as "users") "right_group_by_1" + ON ("right_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery" + group BY + "generated_group_field" + ORDER BY + cnt DESC, generated_group_field ASC + LIMIT 10; + + -- Outer subquery with reference table +SELECT "some_users_data".user_id, lastseen +FROM + (SELECT user_id, max(time) AS lastseen + FROM + (SELECT user_id, time + FROM + (SELECT + user_id, time + FROM + events_reference_table as "events" + WHERE + user_id > 10 and user_id < 40) "events_1" + ORDER BY + time DESC) "recent_events_1" + GROUP BY + user_id + ORDER BY + max(TIME) DESC) "some_recent_users" + FULL JOIN + (SELECT + "users".user_id + FROM + users_table as "users" + WHERE + users.value_2 > 50 and users.value_2 < 55) "some_users_data" + ON "some_users_data"."user_id" = "some_recent_users"."user_id" +ORDER BY + user_id +limit 50; \ No newline at end of file diff --git a/src/test/regress/sql/multi_subquery_in_where_reference_clause.sql b/src/test/regress/sql/multi_subquery_in_where_reference_clause.sql new file mode 100644 index 000000000..0126f362d --- /dev/null +++ b/src/test/regress/sql/multi_subquery_in_where_reference_clause.sql @@ -0,0 +1,130 @@ +-- +-- queries to test the subquery pushdown on reference tables + +-- subqueries in WHERE with greater operator +SELECT + user_id +FROM + users_table +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_reference_table + WHERE + users_table.user_id = events_reference_table.user_id AND event_type = 50 + GROUP BY + user_id + ) +GROUP BY user_id +HAVING count(*) > 66 +ORDER BY user_id +LIMIT 5; + +-- have reference table without any equality, should error out +SELECT + user_id +FROM + users_table +WHERE + value_2 > + (SELECT + max(value_2) + FROM + events_reference_table + WHERE + event_type = 50 + GROUP BY + user_id + ) +GROUP BY user_id +HAVING count(*) > 66 +ORDER BY user_id +LIMIT 5; + +-- users that appeared more than 118 times, should run since the reference table +-- on the right side of the semi join +SELECT + user_id +FROM + users_table +WHERE 118 <= + (SELECT + count(*) + FROM + events_reference_table + WHERE + users_table.user_id = events_reference_table.user_id + GROUP BY + user_id) +GROUP BY + user_id +ORDER BY + user_id; + +-- should error out since reference table exist on the left side +-- of the left lateral join +SELECT user_id, value_2 FROM users_table WHERE + value_1 > 101 AND value_1 < 110 + AND value_2 >= 5 + AND user_id IN + ( + SELECT + e1.user_id + FROM ( + -- Get the first time each user viewed the homepage. + SELECT + user_id, + 1 AS view_homepage, + min(time) AS view_homepage_time + FROM events_reference_table + WHERE + event_type IN (10, 20, 30, 40, 50, 60, 70, 80, 90) + GROUP BY user_id + ) e1 LEFT JOIN LATERAL ( + SELECT + user_id, + 1 AS use_demo, + time AS use_demo_time + FROM events_reference_table + WHERE + user_id = e1.user_id AND + event_type IN (11, 21, 31, 41, 51, 61, 71, 81, 91) + ORDER BY time + ) e2 ON true LEFT JOIN LATERAL ( + SELECT + user_id, + 1 AS enter_credit_card, + time AS enter_credit_card_time + FROM events_reference_table + WHERE + user_id = e2.user_id AND + event_type IN (12, 22, 32, 42, 52, 62, 72, 82, 92) + ORDER BY time + ) e3 ON true LEFT JOIN LATERAL ( + SELECT + 1 AS submit_card_info, + user_id, + time AS enter_credit_card_time + FROM events_reference_table + WHERE + user_id = e3.user_id AND + event_type IN (13, 23, 33, 43, 53, 63, 73, 83, 93) + ORDER BY time + ) e4 ON true LEFT JOIN LATERAL ( + SELECT + 1 AS see_bought_screen + FROM events_reference_table + WHERE + user_id = e4.user_id AND + event_type IN (14, 24, 34, 44, 54, 64, 74, 84, 94) + ORDER BY time + ) e5 ON true + group by e1.user_id + HAVING sum(submit_card_info) > 0 +) +ORDER BY 1, 2; + +DROP TABLE events_reference_table; +DROP TABLE users_reference_table; \ No newline at end of file