Add planner changes and tests for subquery on reference tables

pull/1532/head
velioglu 2017-08-01 12:13:49 +03:00
parent 45717dd013
commit c4e3b8b5e1
13 changed files with 1212 additions and 12 deletions

View File

@ -464,6 +464,13 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
}
shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex);
/* means it is a reference table and do not add any shard interval information */
if (shardOpExpressions == NIL)
{
continue;
}
shardRestrictionList = make_simple_restrictinfo((Expr *) shardOpExpressions);
extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo,
shardRestrictionList);

View File

@ -997,6 +997,23 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
Expr *targetExpression = targetEntry->expr;
bool isPartitionColumn = IsPartitionColumn(targetExpression, query);
Oid relationId = InvalidOid;
Var *column = NULL;
FindReferencedTableColumn(targetExpression, NIL, query, &relationId, &column);
/*
* If the expression belongs to reference table directly returns true,
* since logic of caller function checks whether it can find the necessaary
* data from each node.
*/
if (IsDistributedTable(relationId) && PartitionMethod(relationId) ==
DISTRIBUTE_BY_NONE)
{
targetListOnPartitionColumn = true;
break;
}
if (isPartitionColumn)
{
FieldSelect *compositeField = CompositeFieldRecursive(targetExpression,
@ -2728,6 +2745,50 @@ ExtractRangeTableRelationWalker(Node *node, List **rangeTableRelationList)
}
/* Get the list of relations from the given node. Note that the difference between
* this function and ExtractRangeTableRelationWalker is that this one recursively
* walk into range table entries if it can.
*/
bool
ExtractRTRelationFromNode(Node *node, List **rangeTableList)
{
bool walkIsComplete = false;
if (node == NULL)
{
return false;
}
if (IsA(node, RangeTblEntry))
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
if (rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind != RELKIND_VIEW)
{
(*rangeTableList) = lappend(*rangeTableList, rangeTableEntry);
}
else if (rangeTableEntry->rtekind == RTE_SUBQUERY)
{
walkIsComplete = query_tree_walker(rangeTableEntry->subquery,
ExtractRTRelationFromNode,
rangeTableList, QTW_EXAMINE_RTES);
}
}
else if (IsA(node, Query))
{
walkIsComplete = query_tree_walker((Query *) node, ExtractRTRelationFromNode,
rangeTableList, QTW_EXAMINE_RTES);
}
else
{
walkIsComplete = expression_tree_walker(node, ExtractRTRelationFromNode,
rangeTableList);
}
return walkIsComplete;
}
/*
* ExtractRangeTableEntryWalker walks over a query tree, and finds all range
* table entries. For recursing into the query tree, this function uses the

View File

@ -127,6 +127,8 @@ static Job * BuildJobTreeTaskList(Job *jobTree,
static List * SubquerySqlTaskList(Job *job,
PlannerRestrictionContext *plannerRestrictionContext);
static void ErrorIfUnsupportedShardDistribution(Query *query);
static void ErrorIfUnsupportedJoinReferenceTable(
PlannerRestrictionContext *plannerRestrictionContext);
static bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
static bool ShardIntervalsEqual(FmgrInfo *comparisonFunction,
ShardInterval *firstInterval,
@ -195,6 +197,7 @@ static StringInfo MergeTableQueryString(uint32 taskIdIndex, List *targetEntryLis
static StringInfo IntermediateTableQueryString(uint64 jobId, uint32 taskIdIndex,
Query *reduceQuery);
static uint32 FinalTargetEntryCount(List *targetEntryList);
static bool ReferenceTableExist(PlannerInfo *plannerInfo, RelOptInfo *relationInfo);
/*
@ -2038,6 +2041,9 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
/* error if shards are not co-partitioned */
ErrorIfUnsupportedShardDistribution(subquery);
/* error if unsupported join on reference tables */
ErrorIfUnsupportedJoinReferenceTable(plannerRestrictionContext);
/* get list of all range tables in subquery tree */
ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList);
@ -2061,7 +2067,16 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
break;
}
Assert(targetCacheEntry != NULL);
/*
* That means all table are reference table and we can assign any reference
* table as an anchor one .
*/
if (targetCacheEntry == NULL)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList);
relationId = rangeTableEntry->relid;
targetCacheEntry = DistributedTableCacheEntry(relationId);
}
shardCount = targetCacheEntry->shardIntervalArrayLength;
for (shardOffset = 0; shardOffset < shardCount; shardOffset++)
@ -2088,6 +2103,146 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
}
/*
* ErrorIfUnsupportedJoinReferenceTable errors out if there exists a outer join
* exist between reference table and distributed tables.
*/
static void
ErrorIfUnsupportedJoinReferenceTable(PlannerRestrictionContext *plannerRestrictionContext)
{
List *joinRestrictionList =
plannerRestrictionContext->joinRestrictionContext->joinRestrictionList;
ListCell *joinRestrictionCell = NULL;
foreach(joinRestrictionCell, joinRestrictionList)
{
JoinRestriction *joinRestriction = (JoinRestriction *) lfirst(
joinRestrictionCell);
JoinType joinType = joinRestriction->joinType;
PlannerInfo *plannerInfo = joinRestriction->plannerInfo;
RelOptInfo *innerrel = joinRestriction->innerrel;
RelOptInfo *outerrel = joinRestriction->outerrel;
switch (joinType)
{
case JOIN_SEMI:
{
if (ReferenceTableExist(plannerInfo, outerrel))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("can not plan query having reference table on"
" the left part of semi join")));
}
}
break;
case JOIN_ANTI:
{
if (ReferenceTableExist(plannerInfo, innerrel))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("can not plan query having reference table on"
" the left part of anti join")));
}
}
break;
case JOIN_LEFT:
{
if (ReferenceTableExist(plannerInfo, outerrel))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("can not plan query having reference table on"
" the left part of left join")));
}
}
break;
case JOIN_RIGHT:
{
if (ReferenceTableExist(plannerInfo, innerrel))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("can not plan query having reference table on"
" the right part of right join")));
}
}
break;
case JOIN_FULL:
{
if (ReferenceTableExist(plannerInfo, innerrel) || ReferenceTableExist(
plannerInfo, outerrel))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"can not plan query having reference table as a"
" part of full join")));
}
}
break;
default:
{ }
break;
}
}
}
/*
* ReferenceTableExist check whether the relationInfo has reference table.
* Since relation ids of relationInfo indexes to the range table entry list of
* query, query is also passed.
*/
static bool
ReferenceTableExist(PlannerInfo *plannerInfo, RelOptInfo *relationInfo)
{
Relids relids = bms_copy(relationInfo->relids);
int relationId = -1;
while ((relationId = bms_first_member(relids)) >= 0)
{
RangeTblEntry *rangeTableEntry = plannerInfo->simple_rte_array[relationId];
/* relationInfo has this range table entry */
if (RTEContainsReferenceTable(rangeTableEntry))
{
return true;
}
}
return false;
}
/*
* RTEContainsReferenceTable checks whether there exist a reference table in the
* given range table entry.
*/
bool
RTEContainsReferenceTable(RangeTblEntry *rangeTableEntry)
{
List *relationList = NIL;
ListCell *relationCell = NULL;
ExtractRTRelationFromNode((Node *) rangeTableEntry, &relationList);
foreach(relationCell, relationList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(relationCell);
Oid relationId = rangeTableEntry->relid;
if (IsDistributedTable(relationId) && PartitionMethod(relationId) ==
DISTRIBUTE_BY_NONE)
{
return true;
}
}
return false;
}
/*
* ErrorIfUnsupportedShardDistribution gets list of relations in the given query
* and checks if two conditions below hold for them, otherwise it errors out.
@ -2121,10 +2276,8 @@ ErrorIfUnsupportedShardDistribution(Query *query)
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("Currently range and hash partitioned "
"relations are supported")));
/* do not need to handle reference tables */
continue;
}
}
@ -2141,6 +2294,13 @@ ErrorIfUnsupportedShardDistribution(Query *query)
Oid relationId = lfirst_oid(relationIdCell);
bool coPartitionedTables = false;
Oid currentRelationId = relationId;
char partitionMethod = PartitionMethod(relationId);
/* do not need to check reference tables */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
continue;
}
/* get shard list of first relation and continue for the next relation */
if (relationIndex == 0)
@ -2298,9 +2458,6 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
RestrictInfo *shardRestrictionList = NULL;
DeferredErrorMessage *planningError = NULL;
/* such queries should go through router planner */
Assert(!restrictionContext->allReferenceTables);
/*
* Add the restriction qual parameter value in all baserestrictinfos.
* Note that this has to be done on a copy, as the originals are needed
@ -2315,6 +2472,12 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex);
/* means it is a reference table and do not add any shard interval info */
if (shardOpExpressions == NIL)
{
continue;
}
shardRestrictionList = make_simple_restrictinfo((Expr *) shardOpExpressions);
extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo,
shardRestrictionList);

View File

@ -740,6 +740,8 @@ multi_join_restriction_hook(PlannerInfo *root,
joinRestriction->joinType = jointype;
joinRestriction->joinRestrictInfoList = restrictInfoList;
joinRestriction->plannerInfo = root;
joinRestriction->innerrel = innerrel;
joinRestriction->outerrel = outerrel;
joinRestrictionContext->joinRestrictionList =
lappend(joinRestrictionContext->joinRestrictionList, joinRestriction);

View File

@ -300,10 +300,8 @@ ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex)
}
else
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot create shard interval operator expression for "
"distributed relations other than hash, range and append distributed "
"relations")));
/* do not add any shard range interval for reference tables */
return NIL;
}
/* build the base expression for constraint */

View File

@ -206,6 +206,7 @@ extern List * TableEntryList(List *rangeTableList);
extern List * UsedTableEntryList(Query *query);
extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList);
extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
extern bool ExtractRTRelationFromNode(Node *node, List **rangeTableList);
extern List * pull_var_clause_default(Node *node);
extern bool OperatorImplementsEquality(Oid opno);

View File

@ -286,6 +286,7 @@ extern Const * MakeInt4Constant(Datum constantValue);
extern int CompareShardPlacements(const void *leftElement, const void *rightElement);
extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
ShardInterval *secondInterval);
extern bool RTEContainsReferenceTable(RangeTblEntry *rangeTableEntry);
/* function declarations for Task and Task list operations */
extern bool TasksEqual(const Task *a, const Task *b);

View File

@ -52,6 +52,8 @@ typedef struct JoinRestriction
JoinType joinType;
List *joinRestrictInfoList;
PlannerInfo *plannerInfo;
RelOptInfo *innerrel;
RelOptInfo *outerrel;
} JoinRestriction;
typedef struct PlannerRestrictionContext

View File

@ -0,0 +1,376 @@
--
-- multi subquery complex queries aims to expand existing subquery pushdown
-- regression tests to cover more caeses
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql
--
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000;
SET citus.enable_router_execution TO FALSE;
CREATE TABLE events_reference_table as SELECT * FROM events_table;
CREATE TABLE users_reference_table as SELECT * FROM users_table;
SELECT create_reference_table('events_reference_table');
NOTICE: Copying data from local table...
create_reference_table
------------------------
(1 row)
SELECT create_reference_table('users_reference_table');
NOTICE: Copying data from local table...
create_reference_table
------------------------
(1 row)
-- LATERAL JOINs used with INNER JOINs with reference tables
SET citus.subquery_pushdown to ON;
SELECT user_id, lastseen
FROM
(SELECT
"some_users_data".user_id, lastseen
FROM
(SELECT
filter_users_1.user_id, time AS lastseen
FROM
(SELECT
user_where_1_1.user_id
FROM
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 12 and user_id < 16 and value_1 > 20) user_where_1_1
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 12 and user_id < 16 and value_2 > 60) user_where_1_join_1
ON ("user_where_1_1".user_id = "user_where_1_join_1".user_id))
filter_users_1
JOIN LATERAL
(SELECT
user_id, time
FROM
events_reference_table as "events"
WHERE
user_id > 12 and user_id < 16 AND
user_id = filter_users_1.user_id
ORDER BY
time DESC
LIMIT 1) "last_events_1"
ON TRUE
ORDER BY
time DESC
LIMIT 10) "some_recent_users"
JOIN LATERAL
(SELECT
"users".user_id
FROM
users_reference_table as "users"
WHERE
"users"."user_id" = "some_recent_users"."user_id" AND
"users"."value_2" > 70
LIMIT 1) "some_users_data"
ON TRUE
ORDER BY
lastseen DESC
LIMIT 10) "some_users"
ORDER BY
user_id DESC
LIMIT 10;
user_id | lastseen
---------+---------------------------------
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
(10 rows)
SET citus.subquery_pushdown to OFF;
-- NESTED INNER JOINs with reference tables
SELECT
count(*) AS value, "generated_group_field"
FROM
(SELECT
DISTINCT "pushedDownQuery"."real_user_id", "generated_group_field"
FROM
(SELECT
"eventQuery"."real_user_id", "eventQuery"."time", random(), ("eventQuery"."value_2") AS "generated_group_field"
FROM
(SELECT
*
FROM
(SELECT
"events"."time", "events"."user_id", "events"."value_2"
FROM
events_reference_table as "events"
WHERE
user_id > 10 and user_id < 40 AND event_type IN (40, 41, 42, 43, 44, 45) ) "temp_data_queries"
INNER JOIN
(SELECT
user_where_1_1.real_user_id
FROM
(SELECT
"users"."user_id" as real_user_id
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_2 > 50 ) user_where_1_1
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_3 > 50 ) user_where_1_join_1
ON ("user_where_1_1".real_user_id = "user_where_1_join_1".user_id)) "user_filters_1"
ON ("temp_data_queries".user_id = "user_filters_1".real_user_id)) "eventQuery") "pushedDownQuery") "pushedDownQuery"
GROUP BY
"generated_group_field"
ORDER BY
generated_group_field DESC, value DESC;
value | generated_group_field
-------+-----------------------
1 | 966
1 | 917
1 | 905
1 | 868
1 | 836
1 | 791
1 | 671
1 | 642
1 | 358
1 | 317
1 | 307
1 | 302
1 | 214
1 | 166
1 | 116
1 | 1
(16 rows)
-- single level inner joins with reference tables
SELECT
"value_3", count(*) AS cnt
FROM
(SELECT
"value_3", "user_id", random()
FROM
(SELECT
users_in_segment_1.user_id, value_3
FROM
(SELECT
user_id, value_3 * 2 as value_3
FROM
(SELECT
user_id, value_3
FROM
(SELECT
"users"."user_id", value_3
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_2 > 30
) simple_user_where_1
) all_buckets_1
) users_in_segment_1
JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_2 > 60
) some_users_data
ON ("users_in_segment_1".user_id = "some_users_data".user_id)
) segmentalias_1) "tempQuery"
GROUP BY "value_3"
ORDER BY cnt, value_3 DESC LIMIT 10;
value_3 | cnt
---------+-----
556 | 75
228 | 75
146 | 75
70 | 75
1442 | 79
1232 | 79
1090 | 79
1012 | 79
886 | 79
674 | 79
(10 rows)
-- nested LATERAL JOINs with reference tables
SET citus.subquery_pushdown to ON;
SELECT *
FROM
(SELECT "some_users_data".user_id, "some_recent_users".value_3
FROM
(SELECT
filter_users_1.user_id, value_3
FROM
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 20 and user_id < 70 and users.value_2 = 200) filter_users_1
JOIN LATERAL
(SELECT
user_id, value_3
FROM
events_reference_table as "events"
WHERE
user_id > 20 and user_id < 70 AND
("events".user_id = "filter_users_1".user_id)
ORDER BY
value_3 DESC
LIMIT 1) "last_events_1" ON true
ORDER BY value_3 DESC
LIMIT 10) "some_recent_users"
JOIN LATERAL
(SELECT
"users".user_id
FROM
users_reference_table as "users"
WHERE
"users"."user_id" = "some_recent_users"."user_id" AND
users.value_2 > 200
LIMIT 1) "some_users_data" ON true
ORDER BY
value_3 DESC
LIMIT 10) "some_users"
ORDER BY
value_3 DESC
LIMIT 10;
user_id | value_3
---------+---------
44 | 998
65 | 996
66 | 996
37 | 995
57 | 989
21 | 985
(6 rows)
SET citus.subquery_pushdown to OFF;
-- LEFT JOINs used with INNER JOINs should error out since reference table exist in the
-- left side of the LEFT JOIN.
SELECT
count(*) AS cnt, "generated_group_field"
FROM
(SELECT
"eventQuery"."user_id", random(), generated_group_field
FROM
(SELECT
"multi_group_wrapper_1".*, generated_group_field, random()
FROM
(SELECT *
FROM
(SELECT
"events"."time", "events"."user_id" as event_user_id
FROM
events_table as "events"
WHERE
user_id > 80) "temp_data_queries"
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 80 and value_2 = 5) "user_filters_1"
ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1"
LEFT JOIN
(SELECT
"users"."user_id" AS "user_id", value_2 AS "generated_group_field"
FROM
users_table as "users") "left_group_by_1"
ON ("left_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery"
group BY
"generated_group_field"
ORDER BY
cnt DESC, generated_group_field ASC
LIMIT 10;
ERROR: can not plan query having reference table on the left part of left join
-- RIGHT JOINs used with INNER JOINs should error out since reference table exist in the
-- right side of the RIGHT JOIN.
SELECT
count(*) AS cnt, "generated_group_field"
FROM
(SELECT
"eventQuery"."user_id", random(), generated_group_field
FROM
(SELECT
"multi_group_wrapper_1".*, generated_group_field, random()
FROM
(SELECT *
FROM
(SELECT
"events"."time", "events"."user_id" as event_user_id
FROM
events_table as "events"
WHERE
user_id > 80) "temp_data_queries"
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_table as "users"
WHERE
user_id > 80 and value_2 = 5) "user_filters_1"
ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1"
RIGHT JOIN
(SELECT
"users"."user_id" AS "user_id", value_2 AS "generated_group_field"
FROM
users_reference_table as "users") "right_group_by_1"
ON ("right_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery"
group BY
"generated_group_field"
ORDER BY
cnt DESC, generated_group_field ASC
LIMIT 10;
ERROR: can not plan query having reference table on the left part of left join
-- Outer subquery with reference table
SELECT "some_users_data".user_id, lastseen
FROM
(SELECT user_id, max(time) AS lastseen
FROM
(SELECT user_id, time
FROM
(SELECT
user_id, time
FROM
events_reference_table as "events"
WHERE
user_id > 10 and user_id < 40) "events_1"
ORDER BY
time DESC) "recent_events_1"
GROUP BY
user_id
ORDER BY
max(TIME) DESC) "some_recent_users"
FULL JOIN
(SELECT
"users".user_id
FROM
users_table as "users"
WHERE
users.value_2 > 50 and users.value_2 < 55) "some_users_data"
ON "some_users_data"."user_id" = "some_recent_users"."user_id"
ORDER BY
user_id
limit 50;
ERROR: can not plan query having reference table as a part of full join

View File

@ -0,0 +1,144 @@
--
-- queries to test the subquery pushdown on reference tables
-- subqueries in WHERE with greater operator
SELECT
user_id
FROM
users_table
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_reference_table
WHERE
users_table.user_id = events_reference_table.user_id AND event_type = 50
GROUP BY
user_id
)
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
user_id
---------
49
55
56
63
(4 rows)
-- have reference table without any equality, should error out
SELECT
user_id
FROM
users_table
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_reference_table
WHERE
event_type = 50
GROUP BY
user_id
)
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- users that appeared more than 118 times, should run since the reference table
-- on the right side of the semi join
SELECT
user_id
FROM
users_table
WHERE 118 <=
(SELECT
count(*)
FROM
events_reference_table
WHERE
users_table.user_id = events_reference_table.user_id
GROUP BY
user_id)
GROUP BY
user_id
ORDER BY
user_id;
user_id
---------
13
17
23
25
(4 rows)
-- should error out since reference table exist on the left side
-- of the left lateral join
SELECT user_id, value_2 FROM users_table WHERE
value_1 > 101 AND value_1 < 110
AND value_2 >= 5
AND user_id IN
(
SELECT
e1.user_id
FROM (
-- Get the first time each user viewed the homepage.
SELECT
user_id,
1 AS view_homepage,
min(time) AS view_homepage_time
FROM events_reference_table
WHERE
event_type IN (10, 20, 30, 40, 50, 60, 70, 80, 90)
GROUP BY user_id
) e1 LEFT JOIN LATERAL (
SELECT
user_id,
1 AS use_demo,
time AS use_demo_time
FROM events_reference_table
WHERE
user_id = e1.user_id AND
event_type IN (11, 21, 31, 41, 51, 61, 71, 81, 91)
ORDER BY time
) e2 ON true LEFT JOIN LATERAL (
SELECT
user_id,
1 AS enter_credit_card,
time AS enter_credit_card_time
FROM events_reference_table
WHERE
user_id = e2.user_id AND
event_type IN (12, 22, 32, 42, 52, 62, 72, 82, 92)
ORDER BY time
) e3 ON true LEFT JOIN LATERAL (
SELECT
1 AS submit_card_info,
user_id,
time AS enter_credit_card_time
FROM events_reference_table
WHERE
user_id = e3.user_id AND
event_type IN (13, 23, 33, 43, 53, 63, 73, 83, 93)
ORDER BY time
) e4 ON true LEFT JOIN LATERAL (
SELECT
1 AS see_bought_screen
FROM events_reference_table
WHERE
user_id = e4.user_id AND
event_type IN (14, 24, 34, 44, 54, 64, 74, 84, 94)
ORDER BY time
) e5 ON true
group by e1.user_id
HAVING sum(submit_card_info) > 0
)
ORDER BY 1, 2;
ERROR: can not plan query having reference table on the left part of left join
DROP TABLE events_reference_table;
DROP TABLE users_reference_table;

View File

@ -51,6 +51,8 @@ test: multi_deparse_shard_query multi_distributed_transaction_id
test: multi_basic_queries multi_complex_expressions
test: multi_explain
test: multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_complex_reference_clause
test: multi_subquery_in_where_reference_clause
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
test: multi_reference_table
test: multi_outer_join_reference

View File

@ -0,0 +1,313 @@
--
-- multi subquery complex queries aims to expand existing subquery pushdown
-- regression tests to cover more caeses
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql
--
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000;
SET citus.enable_router_execution TO FALSE;
CREATE TABLE events_reference_table as SELECT * FROM events_table;
CREATE TABLE users_reference_table as SELECT * FROM users_table;
SELECT create_reference_table('events_reference_table');
SELECT create_reference_table('users_reference_table');
-- LATERAL JOINs used with INNER JOINs with reference tables
SET citus.subquery_pushdown to ON;
SELECT user_id, lastseen
FROM
(SELECT
"some_users_data".user_id, lastseen
FROM
(SELECT
filter_users_1.user_id, time AS lastseen
FROM
(SELECT
user_where_1_1.user_id
FROM
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 12 and user_id < 16 and value_1 > 20) user_where_1_1
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 12 and user_id < 16 and value_2 > 60) user_where_1_join_1
ON ("user_where_1_1".user_id = "user_where_1_join_1".user_id))
filter_users_1
JOIN LATERAL
(SELECT
user_id, time
FROM
events_reference_table as "events"
WHERE
user_id > 12 and user_id < 16 AND
user_id = filter_users_1.user_id
ORDER BY
time DESC
LIMIT 1) "last_events_1"
ON TRUE
ORDER BY
time DESC
LIMIT 10) "some_recent_users"
JOIN LATERAL
(SELECT
"users".user_id
FROM
users_reference_table as "users"
WHERE
"users"."user_id" = "some_recent_users"."user_id" AND
"users"."value_2" > 70
LIMIT 1) "some_users_data"
ON TRUE
ORDER BY
lastseen DESC
LIMIT 10) "some_users"
ORDER BY
user_id DESC
LIMIT 10;
SET citus.subquery_pushdown to OFF;
-- NESTED INNER JOINs with reference tables
SELECT
count(*) AS value, "generated_group_field"
FROM
(SELECT
DISTINCT "pushedDownQuery"."real_user_id", "generated_group_field"
FROM
(SELECT
"eventQuery"."real_user_id", "eventQuery"."time", random(), ("eventQuery"."value_2") AS "generated_group_field"
FROM
(SELECT
*
FROM
(SELECT
"events"."time", "events"."user_id", "events"."value_2"
FROM
events_reference_table as "events"
WHERE
user_id > 10 and user_id < 40 AND event_type IN (40, 41, 42, 43, 44, 45) ) "temp_data_queries"
INNER JOIN
(SELECT
user_where_1_1.real_user_id
FROM
(SELECT
"users"."user_id" as real_user_id
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_2 > 50 ) user_where_1_1
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_3 > 50 ) user_where_1_join_1
ON ("user_where_1_1".real_user_id = "user_where_1_join_1".user_id)) "user_filters_1"
ON ("temp_data_queries".user_id = "user_filters_1".real_user_id)) "eventQuery") "pushedDownQuery") "pushedDownQuery"
GROUP BY
"generated_group_field"
ORDER BY
generated_group_field DESC, value DESC;
-- single level inner joins with reference tables
SELECT
"value_3", count(*) AS cnt
FROM
(SELECT
"value_3", "user_id", random()
FROM
(SELECT
users_in_segment_1.user_id, value_3
FROM
(SELECT
user_id, value_3 * 2 as value_3
FROM
(SELECT
user_id, value_3
FROM
(SELECT
"users"."user_id", value_3
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_2 > 30
) simple_user_where_1
) all_buckets_1
) users_in_segment_1
JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_2 > 60
) some_users_data
ON ("users_in_segment_1".user_id = "some_users_data".user_id)
) segmentalias_1) "tempQuery"
GROUP BY "value_3"
ORDER BY cnt, value_3 DESC LIMIT 10;
-- nested LATERAL JOINs with reference tables
SET citus.subquery_pushdown to ON;
SELECT *
FROM
(SELECT "some_users_data".user_id, "some_recent_users".value_3
FROM
(SELECT
filter_users_1.user_id, value_3
FROM
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 20 and user_id < 70 and users.value_2 = 200) filter_users_1
JOIN LATERAL
(SELECT
user_id, value_3
FROM
events_reference_table as "events"
WHERE
user_id > 20 and user_id < 70 AND
("events".user_id = "filter_users_1".user_id)
ORDER BY
value_3 DESC
LIMIT 1) "last_events_1" ON true
ORDER BY value_3 DESC
LIMIT 10) "some_recent_users"
JOIN LATERAL
(SELECT
"users".user_id
FROM
users_reference_table as "users"
WHERE
"users"."user_id" = "some_recent_users"."user_id" AND
users.value_2 > 200
LIMIT 1) "some_users_data" ON true
ORDER BY
value_3 DESC
LIMIT 10) "some_users"
ORDER BY
value_3 DESC
LIMIT 10;
SET citus.subquery_pushdown to OFF;
-- LEFT JOINs used with INNER JOINs should error out since reference table exist in the
-- left side of the LEFT JOIN.
SELECT
count(*) AS cnt, "generated_group_field"
FROM
(SELECT
"eventQuery"."user_id", random(), generated_group_field
FROM
(SELECT
"multi_group_wrapper_1".*, generated_group_field, random()
FROM
(SELECT *
FROM
(SELECT
"events"."time", "events"."user_id" as event_user_id
FROM
events_table as "events"
WHERE
user_id > 80) "temp_data_queries"
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 80 and value_2 = 5) "user_filters_1"
ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1"
LEFT JOIN
(SELECT
"users"."user_id" AS "user_id", value_2 AS "generated_group_field"
FROM
users_table as "users") "left_group_by_1"
ON ("left_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery"
group BY
"generated_group_field"
ORDER BY
cnt DESC, generated_group_field ASC
LIMIT 10;
-- RIGHT JOINs used with INNER JOINs should error out since reference table exist in the
-- right side of the RIGHT JOIN.
SELECT
count(*) AS cnt, "generated_group_field"
FROM
(SELECT
"eventQuery"."user_id", random(), generated_group_field
FROM
(SELECT
"multi_group_wrapper_1".*, generated_group_field, random()
FROM
(SELECT *
FROM
(SELECT
"events"."time", "events"."user_id" as event_user_id
FROM
events_table as "events"
WHERE
user_id > 80) "temp_data_queries"
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_table as "users"
WHERE
user_id > 80 and value_2 = 5) "user_filters_1"
ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1"
RIGHT JOIN
(SELECT
"users"."user_id" AS "user_id", value_2 AS "generated_group_field"
FROM
users_reference_table as "users") "right_group_by_1"
ON ("right_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery"
group BY
"generated_group_field"
ORDER BY
cnt DESC, generated_group_field ASC
LIMIT 10;
-- Outer subquery with reference table
SELECT "some_users_data".user_id, lastseen
FROM
(SELECT user_id, max(time) AS lastseen
FROM
(SELECT user_id, time
FROM
(SELECT
user_id, time
FROM
events_reference_table as "events"
WHERE
user_id > 10 and user_id < 40) "events_1"
ORDER BY
time DESC) "recent_events_1"
GROUP BY
user_id
ORDER BY
max(TIME) DESC) "some_recent_users"
FULL JOIN
(SELECT
"users".user_id
FROM
users_table as "users"
WHERE
users.value_2 > 50 and users.value_2 < 55) "some_users_data"
ON "some_users_data"."user_id" = "some_recent_users"."user_id"
ORDER BY
user_id
limit 50;

View File

@ -0,0 +1,130 @@
--
-- queries to test the subquery pushdown on reference tables
-- subqueries in WHERE with greater operator
SELECT
user_id
FROM
users_table
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_reference_table
WHERE
users_table.user_id = events_reference_table.user_id AND event_type = 50
GROUP BY
user_id
)
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
-- have reference table without any equality, should error out
SELECT
user_id
FROM
users_table
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_reference_table
WHERE
event_type = 50
GROUP BY
user_id
)
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
-- users that appeared more than 118 times, should run since the reference table
-- on the right side of the semi join
SELECT
user_id
FROM
users_table
WHERE 118 <=
(SELECT
count(*)
FROM
events_reference_table
WHERE
users_table.user_id = events_reference_table.user_id
GROUP BY
user_id)
GROUP BY
user_id
ORDER BY
user_id;
-- should error out since reference table exist on the left side
-- of the left lateral join
SELECT user_id, value_2 FROM users_table WHERE
value_1 > 101 AND value_1 < 110
AND value_2 >= 5
AND user_id IN
(
SELECT
e1.user_id
FROM (
-- Get the first time each user viewed the homepage.
SELECT
user_id,
1 AS view_homepage,
min(time) AS view_homepage_time
FROM events_reference_table
WHERE
event_type IN (10, 20, 30, 40, 50, 60, 70, 80, 90)
GROUP BY user_id
) e1 LEFT JOIN LATERAL (
SELECT
user_id,
1 AS use_demo,
time AS use_demo_time
FROM events_reference_table
WHERE
user_id = e1.user_id AND
event_type IN (11, 21, 31, 41, 51, 61, 71, 81, 91)
ORDER BY time
) e2 ON true LEFT JOIN LATERAL (
SELECT
user_id,
1 AS enter_credit_card,
time AS enter_credit_card_time
FROM events_reference_table
WHERE
user_id = e2.user_id AND
event_type IN (12, 22, 32, 42, 52, 62, 72, 82, 92)
ORDER BY time
) e3 ON true LEFT JOIN LATERAL (
SELECT
1 AS submit_card_info,
user_id,
time AS enter_credit_card_time
FROM events_reference_table
WHERE
user_id = e3.user_id AND
event_type IN (13, 23, 33, 43, 53, 63, 73, 83, 93)
ORDER BY time
) e4 ON true LEFT JOIN LATERAL (
SELECT
1 AS see_bought_screen
FROM events_reference_table
WHERE
user_id = e4.user_id AND
event_type IN (14, 24, 34, 44, 54, 64, 74, 84, 94)
ORDER BY time
) e5 ON true
group by e1.user_id
HAVING sum(submit_card_info) > 0
)
ORDER BY 1, 2;
DROP TABLE events_reference_table;
DROP TABLE users_reference_table;