From 7d212b847f56bf9fd56b6e7c05bc3825917a2594 Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Fri, 26 Aug 2016 16:00:31 +0300 Subject: [PATCH 1/2] Add outer join clause list extraction for subquery pushdown logic In subquery pushdown, we allow outer joins if the join condition is on the partition columns. WhereClauseList() used to return all join conditions including outer joins. However, this has been changed with a commit related to outer join support on regular queries. With this commit, we refactored ExtractFromExpressionWalker() to return two lists of qualifiers. The first list is for inner join and filter clauses and the second list is for outer join clauses. Therefore, we can also use outer join clauses to check subquery pushdown prerequisites. --- .../planner/multi_logical_optimizer.c | 16 +-- .../planner/multi_logical_planner.c | 98 ++++++++++++++----- .../distributed/multi_logical_planner.h | 1 + 3 files changed, 82 insertions(+), 33 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 395f94bf0..77914f4ec 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -3400,10 +3400,10 @@ SupportedLateralQuery(Query *parentQuery, Query *lateralQuery) bool supportedLateralQuery = false; List *outerCompositeFieldList = NIL; List *localCompositeFieldList = NIL; - List *whereClauseList = WhereClauseList(lateralQuery->jointree); + ListCell *qualifierCell = NULL; - ListCell *whereClauseCell = NULL; - foreach(whereClauseCell, whereClauseList) + List *qualifierList = QualifierList(lateralQuery->jointree); + foreach(qualifierCell, qualifierList) { OpExpr *operatorExpression = NULL; List *argumentList = NIL; @@ -3417,13 +3417,13 @@ SupportedLateralQuery(Query *parentQuery, Query *lateralQuery) bool outerColumnIsPartitionColumn = false; bool localColumnIsPartitionColumn = false; - Node *clause = (Node *) lfirst(whereClauseCell); - if (!IsA(clause, OpExpr)) + Node *qualifier = (Node *) lfirst(qualifierCell); + if (!IsA(qualifier, OpExpr)) { continue; } - operatorExpression = (OpExpr *) clause; + operatorExpression = (OpExpr *) qualifier; argumentList = operatorExpression->args; /* @@ -3566,8 +3566,8 @@ JoinOnPartitionColumn(Query *query) bool joinOnPartitionColumn = false; List *leftCompositeFieldList = NIL; List *rightCompositeFieldList = NIL; - List *whereClauseList = WhereClauseList(query->jointree); - List *joinClauseList = JoinClauseList(whereClauseList); + List *qualifierList = QualifierList(query->jointree); + List *joinClauseList = JoinClauseList(qualifierList); ListCell *joinClauseCell = NULL; foreach(joinClauseCell, joinClauseList) diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index fa4384a02..067835338 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -38,6 +38,14 @@ bool SubqueryPushdown = false; /* is subquery pushdown enabled */ +/* Struct to differentiate different qualifier types in an expression tree walker */ +typedef struct QualifierWalkerContext +{ + List *baseQualifierList; + List *outerJoinQualifierList; +} QualifierWalkerContext; + + /* Function pointer type definition for apply join rule functions */ typedef MultiNode *(*RuleApplyFunction) (MultiNode *leftNode, MultiNode *rightNode, Var *partitionColumn, JoinType joinType, @@ -56,7 +64,8 @@ static bool HasOuterJoinWalker(Node *node, void *maxJoinLevel); static bool HasComplexJoinOrder(Query *queryTree); static bool HasComplexRangeTableType(Query *queryTree); static void ValidateClauseList(List *clauseList); -static bool ExtractFromExpressionWalker(Node *node, List **qualifierList); +static bool ExtractFromExpressionWalker(Node *node, + QualifierWalkerContext *walkerContext); static List * MultiTableNodeList(List *tableEntryList, List *rangeTableList); static List * AddMultiCollectNodes(List *tableNodeList); static MultiNode * MultiJoinTree(List *joinOrderList, List *collectTableList, @@ -720,23 +729,48 @@ ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList) /* * WhereClauseList walks over the FROM expression in the query tree, and builds * a list of all clauses from the expression tree. The function checks for both - * implicitly and explicitly defined clauses. Explicit clauses are expressed as - * "SELECT ... FROM R1 INNER JOIN R2 ON R1.A = R2.A". Implicit joins differ in - * that they live in the WHERE clause, and are expressed as "SELECT ... FROM - * ... WHERE R1.a = R2.a". + * implicitly and explicitly defined clauses, but only selects INNER join + * explicit clauses, and skips any outer-join clauses. Explicit clauses are + * expressed as "SELECT ... FROM R1 INNER JOIN R2 ON R1.A = R2.A". Implicit + * joins differ in that they live in the WHERE clause, and are expressed as + * "SELECT ... FROM ... WHERE R1.a = R2.a". */ List * WhereClauseList(FromExpr *fromExpr) { FromExpr *fromExprCopy = copyObject(fromExpr); + QualifierWalkerContext *walkerContext = palloc0(sizeof(QualifierWalkerContext)); List *whereClauseList = NIL; - ExtractFromExpressionWalker((Node *) fromExprCopy, &whereClauseList); + ExtractFromExpressionWalker((Node *) fromExprCopy, walkerContext); + whereClauseList = walkerContext->baseQualifierList; return whereClauseList; } +/* + * QualifierList walks over the FROM expression in the query tree, and builds + * a list of all qualifiers from the expression tree. The function checks for + * both implicitly and explicitly defined qualifiers. Note that this function + * is very similar to WhereClauseList(), but QualifierList() also includes + * outer-join clauses. + */ +List * +QualifierList(FromExpr *fromExpr) +{ + FromExpr *fromExprCopy = copyObject(fromExpr); + QualifierWalkerContext *walkerContext = palloc0(sizeof(QualifierWalkerContext)); + List *qualifierList = NIL; + + ExtractFromExpressionWalker((Node *) fromExprCopy, walkerContext); + qualifierList = list_concat(qualifierList, walkerContext->baseQualifierList); + qualifierList = list_concat(qualifierList, walkerContext->outerJoinQualifierList); + + return qualifierList; +} + + /* * ValidateClauseList walks over the given list of clauses, and checks that we * can recognize all the clauses. This function ensures that we do not drop an @@ -790,8 +824,15 @@ JoinClauseList(List *whereClauseList) /* * ExtractFromExpressionWalker walks over a FROM expression, and finds all - * explicit qualifiers in the expression. The function looks at join and from - * expression nodes to find explicit qualifiers, and returns these qualifiers. + * implicit and explicit qualifiers in the expression. The function looks at + * join and from expression nodes to find qualifiers, and returns these + * qualifiers. + * + * Note that we don't want outer join clauses in regular outer join planning, + * but we need outer join clauses in subquery pushdown prerequisite checks. + * Therefore, outer join qualifiers are returned in a different list than other + * qualifiers inside the given walker context. For this reason, we return two + * qualifier lists. * * Note that we check if the qualifier node in join and from expression nodes * is a list node. If it is not a list node which is the case for subqueries, @@ -803,7 +844,7 @@ JoinClauseList(List *whereClauseList) * query tree. */ static bool -ExtractFromExpressionWalker(Node *node, List **qualifierList) +ExtractFromExpressionWalker(Node *node, QualifierWalkerContext *walkerContext) { bool walkerResult = false; if (node == NULL) @@ -824,11 +865,7 @@ ExtractFromExpressionWalker(Node *node, List **qualifierList) Node *joinQualifiersNode = joinExpression->quals; JoinType joinType = joinExpression->jointype; - /* - * We only extract qualifiers from inner join clauses, which can be - * treated as WHERE clauses. - */ - if (joinQualifiersNode != NULL && joinType == JOIN_INNER) + if (joinQualifiersNode != NULL) { if (IsA(joinQualifiersNode, List)) { @@ -841,8 +878,18 @@ ExtractFromExpressionWalker(Node *node, List **qualifierList) joinClause = (Node *) canonicalize_qual((Expr *) joinClause); joinQualifierList = make_ands_implicit((Expr *) joinClause); } + } - (*qualifierList) = list_concat(*qualifierList, joinQualifierList); + /* return outer join clauses in a separate list */ + if (joinType == JOIN_INNER) + { + walkerContext->baseQualifierList = + list_concat(walkerContext->baseQualifierList, joinQualifierList); + } + else if (IS_OUTER_JOIN(joinType)) + { + walkerContext->outerJoinQualifierList = + list_concat(walkerContext->outerJoinQualifierList, joinQualifierList); } } else if (IsA(node, FromExpr)) @@ -865,12 +912,13 @@ ExtractFromExpressionWalker(Node *node, List **qualifierList) fromQualifierList = make_ands_implicit((Expr *) fromClause); } - (*qualifierList) = list_concat(*qualifierList, fromQualifierList); + walkerContext->baseQualifierList = + list_concat(walkerContext->baseQualifierList, fromQualifierList); } } walkerResult = expression_tree_walker(node, ExtractFromExpressionWalker, - (void *) qualifierList); + (void *) walkerContext); return walkerResult; } @@ -1867,8 +1915,8 @@ static MultiNode * SubqueryPushdownMultiPlanTree(Query *queryTree, List *subqueryEntryList) { List *targetEntryList = queryTree->targetList; - List *whereClauseList = NIL; - List *whereClauseColumnList = NIL; + List *qualifierList = NIL; + List *qualifierColumnList = NIL; List *targetListColumnList = NIL; List *columnList = NIL; ListCell *columnCell = NULL; @@ -1884,9 +1932,9 @@ SubqueryPushdownMultiPlanTree(Query *queryTree, List *subqueryEntryList) ErrorIfQueryNotSupported(queryTree); ErrorIfSubqueryJoin(queryTree); - /* extract where clause qualifiers and verify we can plan for them */ - whereClauseList = WhereClauseList(queryTree->jointree); - ValidateClauseList(whereClauseList); + /* extract qualifiers and verify we can plan for them */ + qualifierList = QualifierList(queryTree->jointree); + ValidateClauseList(qualifierList); /* * We disregard pulled subqueries. This changes order of range table list. @@ -1897,10 +1945,10 @@ SubqueryPushdownMultiPlanTree(Query *queryTree, List *subqueryEntryList) */ Assert(list_length(subqueryEntryList) == 1); - whereClauseColumnList = pull_var_clause_default((Node *) whereClauseList); + qualifierColumnList = pull_var_clause_default((Node *) qualifierList); targetListColumnList = pull_var_clause_default((Node *) targetEntryList); - columnList = list_concat(whereClauseColumnList, targetListColumnList); + columnList = list_concat(qualifierColumnList, targetListColumnList); foreach(columnCell, columnList) { Var *column = (Var *) lfirst(columnCell); @@ -1915,7 +1963,7 @@ SubqueryPushdownMultiPlanTree(Query *queryTree, List *subqueryEntryList) currentTopNode = (MultiNode *) subqueryCollectNode; /* build select node if the query has selection criteria */ - selectNode = MultiSelectNode(whereClauseList); + selectNode = MultiSelectNode(qualifierList); if (selectNode != NULL) { SetChild((MultiUnaryNode *) selectNode, currentTopNode); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 839dd740b..3698934d3 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -198,6 +198,7 @@ extern bool IsJoinClause(Node *clause); extern List * SubqueryEntryList(Query *queryTree); extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList); extern List * WhereClauseList(FromExpr *fromExpr); +extern List * QualifierList(FromExpr *fromExpr); extern List * TableEntryList(List *rangeTableList); extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList); extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList); From 5b50f2c33390631b0410b961bdbe75e5c46de106 Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Mon, 29 Aug 2016 16:06:34 +0300 Subject: [PATCH 2/2] Add complex subquery pushdown regression tests --- src/test/regress/input/multi_subquery.source | 538 ++++++++++++++ src/test/regress/output/multi_subquery.source | 658 ++++++++++++++++++ 2 files changed, 1196 insertions(+) diff --git a/src/test/regress/input/multi_subquery.source b/src/test/regress/input/multi_subquery.source index e3f684b34..abbacc398 100644 --- a/src/test/regress/input/multi_subquery.source +++ b/src/test/regress/input/multi_subquery.source @@ -377,3 +377,541 @@ SELECT * FROM AS foo; DROP TABLE subquery_pruning_varchar_test_table; + +-- Create composite type to use in subquery pushdown +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); + +\c - - - :worker_1_port + +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); + +\c - - - :worker_2_port + +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); + +\c - - - :master_port + + +CREATE TABLE events ( + composite_id user_composite_type, + event_id bigint, + event_type character varying(255), + event_time bigint +); +SELECT master_create_distributed_table('events', 'composite_id', 'range'); + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardid = :new_shard_id; + +\COPY events FROM STDIN WITH CSV +"(1,1001)",20001,click,1472807012 +"(1,1001)",20002,submit,1472807015 +"(1,1001)",20003,pay,1472807020 +"(1,1002)",20010,click,1472807022 +"(1,1002)",20011,click,1472807023 +"(1,1002)",20012,submit,1472807025 +"(1,1002)",20013,pay,1472807030 +"(1,1003)",20014,click,1472807032 +"(1,1003)",20015,click,1472807033 +"(1,1003)",20016,click,1472807034 +"(1,1003)",20017,submit,1472807035 +\. + +CREATE TABLE users ( + composite_id user_composite_type, + lastseen bigint +); +SELECT master_create_distributed_table('users', 'composite_id', 'range'); + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardid = :new_shard_id; + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardid = :new_shard_id; + +\COPY users FROM STDIN WITH CSV +"(1,1001)",1472807115 +"(1,1002)",1472807215 +"(1,1003)",1472807315 +\. + + +SET citus.subquery_pushdown TO TRUE; + +-- Simple join subquery pushdown +SELECT + avg(array_length(events, 1)) AS event_average +FROM + (SELECT + tenant_id, + user_id, + array_agg(event_type ORDER BY event_time) AS events + FROM + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + event_type, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type IN ('click', 'submit', 'pay')) AS subquery + GROUP BY + tenant_id, + user_id) AS subquery; + +-- Union and left join subquery pushdown +SELECT + avg(array_length(events, 1)) AS event_average, + hasdone +FROM + (SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(hasdone, 'Has not done paying') AS hasdone + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, + (composite_id).tenant_id, + (composite_id).user_id, + 'Has done paying'::TEXT AS hasdone + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay') AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + hasdone) AS subquery_top +GROUP BY + hasdone; + +-- Union, left join and having subquery pushdown +SELECT + avg(array_length(events, 1)) AS event_average, + count_pay + FROM ( + SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(count_pay, 0) AS count_pay + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + COUNT(*) AS count_pay + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay' + GROUP BY + tenant_id, + user_id + HAVING + COUNT(*) > 2) AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + count_pay) AS subquery_top +WHERE + array_ndims(events) > 0 +GROUP BY + count_pay +ORDER BY + count_pay; + +-- Lateral join subquery pushdown +SELECT + tenant_id, + user_id, + user_lastseen, + event_array +FROM + (SELECT + tenant_id, + user_id, + max(lastseen) as user_lastseen, + array_agg(event_type ORDER BY event_time) AS event_array + FROM + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + lastseen + FROM + users + WHERE + composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + composite_id <= '(1, 9223372036854775807)'::user_composite_type + ORDER BY + lastseen DESC + LIMIT + 10 + ) AS subquery_top + LEFT JOIN LATERAL + (SELECT + event_type, + event_time + FROM + events + WHERE + (composite_id).tenant_id = subquery_top.tenant_id AND + (composite_id).user_id = subquery_top.user_id + ORDER BY + event_time DESC + LIMIT + 99) AS subquery_lateral + ON + true + GROUP BY + tenant_id, + user_id + ) AS shard_union +ORDER BY + user_lastseen DESC +LIMIT + 10; + + +-- Same queries above with explain +SET client_min_messages TO DEBUG2; + +-- Simple join subquery pushdown +EXPLAIN SELECT + avg(array_length(events, 1)) AS event_average +FROM + (SELECT + tenant_id, + user_id, + array_agg(event_type ORDER BY event_time) AS events + FROM + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + event_type, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type IN ('click', 'submit', 'pay')) AS subquery + GROUP BY + tenant_id, + user_id) AS subquery; + +-- Union and left join subquery pushdown +EXPLAIN SELECT + avg(array_length(events, 1)) AS event_average, + hasdone +FROM + (SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(hasdone, 'Has not done paying') AS hasdone + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, + (composite_id).tenant_id, + (composite_id).user_id, + 'Has done paying'::TEXT AS hasdone + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay') AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + hasdone) AS subquery_top +GROUP BY + hasdone; + +-- Union, left join and having subquery pushdown +EXPLAIN SELECT + avg(array_length(events, 1)) AS event_average, + count_pay + FROM ( + SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(count_pay, 0) AS count_pay + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + COUNT(*) AS count_pay + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay' + GROUP BY + tenant_id, + user_id + HAVING + COUNT(*) > 2) AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + count_pay) AS subquery_top +WHERE + array_ndims(events) > 0 +GROUP BY + count_pay +ORDER BY + count_pay; + +-- Lateral join subquery pushdown +EXPLAIN SELECT + tenant_id, + user_id, + user_lastseen, + event_array +FROM + (SELECT + tenant_id, + user_id, + max(lastseen) as user_lastseen, + array_agg(event_type ORDER BY event_time) AS event_array + FROM + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + lastseen + FROM + users + WHERE + composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + composite_id <= '(1, 9223372036854775807)'::user_composite_type + ORDER BY + lastseen DESC + LIMIT + 10 + ) AS subquery_top + LEFT JOIN LATERAL + (SELECT + event_type, + event_time + FROM + events + WHERE + (composite_id).tenant_id = subquery_top.tenant_id AND + (composite_id).user_id = subquery_top.user_id + ORDER BY + event_time DESC + LIMIT + 99) AS subquery_lateral + ON + true + GROUP BY + tenant_id, + user_id + ) AS shard_union +ORDER BY + user_lastseen DESC +LIMIT + 10; + +SET citusdb.task_executor_type TO 'real-time'; +SET client_min_messages TO NOTICE; diff --git a/src/test/regress/output/multi_subquery.source b/src/test/regress/output/multi_subquery.source index e0340f3f6..74a9f72f2 100644 --- a/src/test/regress/output/multi_subquery.source +++ b/src/test/regress/output/multi_subquery.source @@ -421,3 +421,661 @@ AS foo; (0 rows) DROP TABLE subquery_pruning_varchar_test_table; +-- Create composite type to use in subquery pushdown +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); +\c - - - :worker_1_port +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); +\c - - - :worker_2_port +CREATE TYPE user_composite_type AS +( + tenant_id BIGINT, + user_id BIGINT +); +\c - - - :master_port +CREATE TABLE events ( + composite_id user_composite_type, + event_id bigint, + event_type character varying(255), + event_time bigint +); +SELECT master_create_distributed_table('events', 'composite_id', 'range'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('events') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardid = :new_shard_id; +\COPY events FROM STDIN WITH CSV +CREATE TABLE users ( + composite_id user_composite_type, + lastseen bigint +); +SELECT master_create_distributed_table('users', 'composite_id', 'range'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardid = :new_shard_id; +SELECT master_create_empty_shard('users') AS new_shard_id +\gset +UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardid = :new_shard_id; +\COPY users FROM STDIN WITH CSV +SET citus.subquery_pushdown TO TRUE; +-- Simple join subquery pushdown +SELECT + avg(array_length(events, 1)) AS event_average +FROM + (SELECT + tenant_id, + user_id, + array_agg(event_type ORDER BY event_time) AS events + FROM + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + event_type, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type IN ('click', 'submit', 'pay')) AS subquery + GROUP BY + tenant_id, + user_id) AS subquery; + event_average +-------------------- + 3.6666666666666667 +(1 row) + +-- Union and left join subquery pushdown +SELECT + avg(array_length(events, 1)) AS event_average, + hasdone +FROM + (SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(hasdone, 'Has not done paying') AS hasdone + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, + (composite_id).tenant_id, + (composite_id).user_id, + 'Has done paying'::TEXT AS hasdone + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay') AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + hasdone) AS subquery_top +GROUP BY + hasdone; + event_average | hasdone +--------------------+--------------------- + 4.0000000000000000 | Has not done paying + 2.5000000000000000 | Has done paying +(2 rows) + +-- Union, left join and having subquery pushdown +SELECT + avg(array_length(events, 1)) AS event_average, + count_pay + FROM ( + SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(count_pay, 0) AS count_pay + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + COUNT(*) AS count_pay + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay' + GROUP BY + tenant_id, + user_id + HAVING + COUNT(*) > 2) AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + count_pay) AS subquery_top +WHERE + array_ndims(events) > 0 +GROUP BY + count_pay +ORDER BY + count_pay; + event_average | count_pay +--------------------+----------- + 3.0000000000000000 | 0 +(1 row) + +-- Lateral join subquery pushdown +SELECT + tenant_id, + user_id, + user_lastseen, + event_array +FROM + (SELECT + tenant_id, + user_id, + max(lastseen) as user_lastseen, + array_agg(event_type ORDER BY event_time) AS event_array + FROM + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + lastseen + FROM + users + WHERE + composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + composite_id <= '(1, 9223372036854775807)'::user_composite_type + ORDER BY + lastseen DESC + LIMIT + 10 + ) AS subquery_top + LEFT JOIN LATERAL + (SELECT + event_type, + event_time + FROM + events + WHERE + (composite_id).tenant_id = subquery_top.tenant_id AND + (composite_id).user_id = subquery_top.user_id + ORDER BY + event_time DESC + LIMIT + 99) AS subquery_lateral + ON + true + GROUP BY + tenant_id, + user_id + ) AS shard_union +ORDER BY + user_lastseen DESC +LIMIT + 10; + tenant_id | user_id | user_lastseen | event_array +-----------+---------+---------------+---------------------------- + 1 | 1003 | 1472807315 | {click,click,click,submit} + 1 | 1002 | 1472807215 | {click,click,submit,pay} + 1 | 1001 | 1472807115 | {click,submit,pay} +(3 rows) + +-- Same queries above with explain +SET client_min_messages TO DEBUG2; +-- Simple join subquery pushdown +EXPLAIN SELECT + avg(array_length(events, 1)) AS event_average +FROM + (SELECT + tenant_id, + user_id, + array_agg(event_type ORDER BY event_time) AS events + FROM + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + event_type, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type IN ('click', 'submit', 'pay')) AS subquery + GROUP BY + tenant_id, + user_id) AS subquery; +DEBUG: predicate pruning for shardId 270015 +DEBUG: predicate pruning for shardId 270016 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Distributed Query into pg_merge_job_270014 + Executor: Real-Time + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate (cost=40.01..40.02 rows=1 width=32) + -> GroupAggregate (cost=39.89..39.99 rows=1 width=556) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Merge Join (cost=39.89..39.97 rows=1 width=556) + Merge Cond: ((((users.composite_id).tenant_id) = ((events.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events.composite_id).user_id))) + -> Sort (cost=28.08..28.09 rows=6 width=32) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Sort (cost=11.81..11.82 rows=3 width=556) + Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id) + -> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556) + Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[])) + Master Query + -> Aggregate (cost=0.01..0.02 rows=1 width=0) + -> Seq Scan on pg_merge_job_270014 (cost=0.00..0.00 rows=0 width=0) +(22 rows) + +-- Union and left join subquery pushdown +EXPLAIN SELECT + avg(array_length(events, 1)) AS event_average, + hasdone +FROM + (SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(hasdone, 'Has not done paying') AS hasdone + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id, + (composite_id).tenant_id, + (composite_id).user_id, + 'Has done paying'::TEXT AS hasdone + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay') AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + hasdone) AS subquery_top +GROUP BY + hasdone; +DEBUG: predicate pruning for shardId 270015 +DEBUG: predicate pruning for shardId 270016 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +DEBUG: predicate pruning for shardId 270015 +DEBUG: predicate pruning for shardId 270016 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +DEBUG: building index "pg_toast_17247_index" on table "pg_toast_17247" + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Distributed Query into pg_merge_job_270015 + Executor: Real-Time + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate (cost=91.94..91.96 rows=2 width=64) + Group Key: COALESCE(('Has done paying'::text), 'Has not done paying'::text) + -> GroupAggregate (cost=91.85..91.90 rows=2 width=88) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) + -> Sort (cost=91.85..91.85 rows=2 width=88) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) + -> Merge Left Join (cost=91.75..91.84 rows=2 width=88) + Merge Cond: ((((users.composite_id).tenant_id) = ((events_2.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events_2.composite_id).user_id))) + -> Unique (cost=79.46..79.48 rows=2 width=40) + -> Sort (cost=79.46..79.47 rows=2 width=40) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('action=>1'::text), events.event_time + -> Append (cost=0.00..79.45 rows=2 width=40) + -> Nested Loop (cost=0.00..39.72 rows=1 width=40) + Join Filter: (((users.composite_id).tenant_id = (events.composite_id).tenant_id) AND ((users.composite_id).user_id = (events.composite_id).user_id)) + -> Seq Scan on events_270009 events (cost=0.00..11.62 rows=1 width=40) + Filter: ((event_type)::text = 'click'::text) + -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Nested Loop (cost=0.00..39.72 rows=1 width=40) + Join Filter: (((users_1.composite_id).tenant_id = (events_1.composite_id).tenant_id) AND ((users_1.composite_id).user_id = (events_1.composite_id).user_id)) + -> Seq Scan on events_270009 events_1 (cost=0.00..11.62 rows=1 width=40) + Filter: ((event_type)::text = 'submit'::text) + -> Seq Scan on users_270013 users_1 (cost=0.00..28.00 rows=6 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Materialize (cost=12.29..12.31 rows=1 width=48) + -> Unique (cost=12.29..12.30 rows=1 width=32) + -> Sort (cost=12.29..12.29 rows=1 width=32) + Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id) + -> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) + Master Query + -> HashAggregate (cost=0.00..0.18 rows=10 width=0) + Group Key: intermediate_column_270015_2 + -> Seq Scan on pg_merge_job_270015 (cost=0.00..0.00 rows=0 width=0) +(40 rows) + +-- Union, left join and having subquery pushdown +EXPLAIN SELECT + avg(array_length(events, 1)) AS event_average, + count_pay + FROM ( + SELECT + subquery_1.tenant_id, + subquery_1.user_id, + array_agg(event ORDER BY event_time) AS events, + COALESCE(count_pay, 0) AS count_pay + FROM + ( + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>1'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'click') + UNION + (SELECT + (users.composite_id).tenant_id, + (users.composite_id).user_id, + 'action=>2'AS event, + events.event_time + FROM + users, + events + WHERE + (users.composite_id).tenant_id = (events.composite_id).tenant_id AND + (users.composite_id).user_id = (events.composite_id).user_id AND + users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'submit') + ) AS subquery_1 + LEFT JOIN + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + COUNT(*) AS count_pay + FROM + events + WHERE + events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND + event_type = 'pay' + GROUP BY + tenant_id, + user_id + HAVING + COUNT(*) > 2) AS subquery_2 + ON + subquery_1.tenant_id = subquery_2.tenant_id AND + subquery_1.user_id = subquery_2.user_id + GROUP BY + subquery_1.tenant_id, + subquery_1.user_id, + count_pay) AS subquery_top +WHERE + array_ndims(events) > 0 +GROUP BY + count_pay +ORDER BY + count_pay; +DEBUG: predicate pruning for shardId 270015 +DEBUG: predicate pruning for shardId 270016 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +DEBUG: predicate pruning for shardId 270015 +DEBUG: predicate pruning for shardId 270016 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +ERROR: bogus varattno for OUTER_VAR var: 3 +-- Lateral join subquery pushdown +EXPLAIN SELECT + tenant_id, + user_id, + user_lastseen, + event_array +FROM + (SELECT + tenant_id, + user_id, + max(lastseen) as user_lastseen, + array_agg(event_type ORDER BY event_time) AS event_array + FROM + (SELECT + (composite_id).tenant_id, + (composite_id).user_id, + lastseen + FROM + users + WHERE + composite_id >= '(1, -9223372036854775808)'::user_composite_type AND + composite_id <= '(1, 9223372036854775807)'::user_composite_type + ORDER BY + lastseen DESC + LIMIT + 10 + ) AS subquery_top + LEFT JOIN LATERAL + (SELECT + event_type, + event_time + FROM + events + WHERE + (composite_id).tenant_id = subquery_top.tenant_id AND + (composite_id).user_id = subquery_top.user_id + ORDER BY + event_time DESC + LIMIT + 99) AS subquery_lateral + ON + true + GROUP BY + tenant_id, + user_id + ) AS shard_union +ORDER BY + user_lastseen DESC +LIMIT + 10; +DEBUG: push down of limit count: 10 +DEBUG: predicate pruning for shardId 270015 +DEBUG: predicate pruning for shardId 270016 +DEBUG: predicate pruning for shardId 270011 +DEBUG: predicate pruning for shardId 270012 +DEBUG: building index "pg_toast_17256_index" on table "pg_toast_17256" + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Distributed Query into pg_merge_job_270017 + Executor: Real-Time + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit (cost=100.43..100.44 rows=6 width=56) + -> Sort (cost=100.43..100.44 rows=6 width=56) + Sort Key: (max(users.lastseen)) DESC + -> GroupAggregate (cost=100.14..100.29 rows=6 width=548) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Sort (cost=100.14..100.16 rows=6 width=548) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Nested Loop Left Join (cost=40.04..100.06 rows=6 width=548) + -> Limit (cost=28.08..28.09 rows=6 width=40) + -> Sort (cost=28.08..28.09 rows=6 width=40) + Sort Key: users.lastseen DESC + -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=40) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Limit (cost=11.96..11.96 rows=1 width=524) + -> Sort (cost=11.96..11.96 rows=1 width=524) + Sort Key: events.event_time DESC + -> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524) + Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id))) + Master Query + -> Limit (cost=0.01..0.02 rows=0 width=0) + -> Sort (cost=0.01..0.02 rows=0 width=0) + Sort Key: intermediate_column_270017_2 DESC + -> Seq Scan on pg_merge_job_270017 (cost=0.00..0.00 rows=0 width=0) +(29 rows) + +SET citusdb.task_executor_type TO 'real-time'; +SET client_min_messages TO NOTICE;