Merge pull request #755 from citusdata/fix_754_add_outer_join_clause_list_check

Add outer join clause list extraction for subquery pushdown logic
pull/768/head
Metin Döşlü 2016-09-02 15:01:49 +03:00 committed by GitHub
commit 6333f9ba6f
5 changed files with 1278 additions and 33 deletions

View File

@ -3400,10 +3400,10 @@ SupportedLateralQuery(Query *parentQuery, Query *lateralQuery)
bool supportedLateralQuery = false;
List *outerCompositeFieldList = NIL;
List *localCompositeFieldList = NIL;
List *whereClauseList = WhereClauseList(lateralQuery->jointree);
ListCell *qualifierCell = NULL;
ListCell *whereClauseCell = NULL;
foreach(whereClauseCell, whereClauseList)
List *qualifierList = QualifierList(lateralQuery->jointree);
foreach(qualifierCell, qualifierList)
{
OpExpr *operatorExpression = NULL;
List *argumentList = NIL;
@ -3417,13 +3417,13 @@ SupportedLateralQuery(Query *parentQuery, Query *lateralQuery)
bool outerColumnIsPartitionColumn = false;
bool localColumnIsPartitionColumn = false;
Node *clause = (Node *) lfirst(whereClauseCell);
if (!IsA(clause, OpExpr))
Node *qualifier = (Node *) lfirst(qualifierCell);
if (!IsA(qualifier, OpExpr))
{
continue;
}
operatorExpression = (OpExpr *) clause;
operatorExpression = (OpExpr *) qualifier;
argumentList = operatorExpression->args;
/*
@ -3566,8 +3566,8 @@ JoinOnPartitionColumn(Query *query)
bool joinOnPartitionColumn = false;
List *leftCompositeFieldList = NIL;
List *rightCompositeFieldList = NIL;
List *whereClauseList = WhereClauseList(query->jointree);
List *joinClauseList = JoinClauseList(whereClauseList);
List *qualifierList = QualifierList(query->jointree);
List *joinClauseList = JoinClauseList(qualifierList);
ListCell *joinClauseCell = NULL;
foreach(joinClauseCell, joinClauseList)

View File

@ -38,6 +38,14 @@
bool SubqueryPushdown = false; /* is subquery pushdown enabled */
/* Struct to differentiate different qualifier types in an expression tree walker */
typedef struct QualifierWalkerContext
{
List *baseQualifierList;
List *outerJoinQualifierList;
} QualifierWalkerContext;
/* Function pointer type definition for apply join rule functions */
typedef MultiNode *(*RuleApplyFunction) (MultiNode *leftNode, MultiNode *rightNode,
Var *partitionColumn, JoinType joinType,
@ -56,7 +64,8 @@ static bool HasOuterJoinWalker(Node *node, void *maxJoinLevel);
static bool HasComplexJoinOrder(Query *queryTree);
static bool HasComplexRangeTableType(Query *queryTree);
static void ValidateClauseList(List *clauseList);
static bool ExtractFromExpressionWalker(Node *node, List **qualifierList);
static bool ExtractFromExpressionWalker(Node *node,
QualifierWalkerContext *walkerContext);
static List * MultiTableNodeList(List *tableEntryList, List *rangeTableList);
static List * AddMultiCollectNodes(List *tableNodeList);
static MultiNode * MultiJoinTree(List *joinOrderList, List *collectTableList,
@ -720,23 +729,48 @@ ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList)
/*
* WhereClauseList walks over the FROM expression in the query tree, and builds
* a list of all clauses from the expression tree. The function checks for both
* implicitly and explicitly defined clauses. Explicit clauses are expressed as
* "SELECT ... FROM R1 INNER JOIN R2 ON R1.A = R2.A". Implicit joins differ in
* that they live in the WHERE clause, and are expressed as "SELECT ... FROM
* ... WHERE R1.a = R2.a".
* implicitly and explicitly defined clauses, but only selects INNER join
* explicit clauses, and skips any outer-join clauses. Explicit clauses are
* expressed as "SELECT ... FROM R1 INNER JOIN R2 ON R1.A = R2.A". Implicit
* joins differ in that they live in the WHERE clause, and are expressed as
* "SELECT ... FROM ... WHERE R1.a = R2.a".
*/
List *
WhereClauseList(FromExpr *fromExpr)
{
FromExpr *fromExprCopy = copyObject(fromExpr);
QualifierWalkerContext *walkerContext = palloc0(sizeof(QualifierWalkerContext));
List *whereClauseList = NIL;
ExtractFromExpressionWalker((Node *) fromExprCopy, &whereClauseList);
ExtractFromExpressionWalker((Node *) fromExprCopy, walkerContext);
whereClauseList = walkerContext->baseQualifierList;
return whereClauseList;
}
/*
* QualifierList walks over the FROM expression in the query tree, and builds
* a list of all qualifiers from the expression tree. The function checks for
* both implicitly and explicitly defined qualifiers. Note that this function
* is very similar to WhereClauseList(), but QualifierList() also includes
* outer-join clauses.
*/
List *
QualifierList(FromExpr *fromExpr)
{
FromExpr *fromExprCopy = copyObject(fromExpr);
QualifierWalkerContext *walkerContext = palloc0(sizeof(QualifierWalkerContext));
List *qualifierList = NIL;
ExtractFromExpressionWalker((Node *) fromExprCopy, walkerContext);
qualifierList = list_concat(qualifierList, walkerContext->baseQualifierList);
qualifierList = list_concat(qualifierList, walkerContext->outerJoinQualifierList);
return qualifierList;
}
/*
* ValidateClauseList walks over the given list of clauses, and checks that we
* can recognize all the clauses. This function ensures that we do not drop an
@ -790,8 +824,15 @@ JoinClauseList(List *whereClauseList)
/*
* ExtractFromExpressionWalker walks over a FROM expression, and finds all
* explicit qualifiers in the expression. The function looks at join and from
* expression nodes to find explicit qualifiers, and returns these qualifiers.
* implicit and explicit qualifiers in the expression. The function looks at
* join and from expression nodes to find qualifiers, and returns these
* qualifiers.
*
* Note that we don't want outer join clauses in regular outer join planning,
* but we need outer join clauses in subquery pushdown prerequisite checks.
* Therefore, outer join qualifiers are returned in a different list than other
* qualifiers inside the given walker context. For this reason, we return two
* qualifier lists.
*
* Note that we check if the qualifier node in join and from expression nodes
* is a list node. If it is not a list node which is the case for subqueries,
@ -803,7 +844,7 @@ JoinClauseList(List *whereClauseList)
* query tree.
*/
static bool
ExtractFromExpressionWalker(Node *node, List **qualifierList)
ExtractFromExpressionWalker(Node *node, QualifierWalkerContext *walkerContext)
{
bool walkerResult = false;
if (node == NULL)
@ -824,11 +865,7 @@ ExtractFromExpressionWalker(Node *node, List **qualifierList)
Node *joinQualifiersNode = joinExpression->quals;
JoinType joinType = joinExpression->jointype;
/*
* We only extract qualifiers from inner join clauses, which can be
* treated as WHERE clauses.
*/
if (joinQualifiersNode != NULL && joinType == JOIN_INNER)
if (joinQualifiersNode != NULL)
{
if (IsA(joinQualifiersNode, List))
{
@ -841,8 +878,18 @@ ExtractFromExpressionWalker(Node *node, List **qualifierList)
joinClause = (Node *) canonicalize_qual((Expr *) joinClause);
joinQualifierList = make_ands_implicit((Expr *) joinClause);
}
}
(*qualifierList) = list_concat(*qualifierList, joinQualifierList);
/* return outer join clauses in a separate list */
if (joinType == JOIN_INNER)
{
walkerContext->baseQualifierList =
list_concat(walkerContext->baseQualifierList, joinQualifierList);
}
else if (IS_OUTER_JOIN(joinType))
{
walkerContext->outerJoinQualifierList =
list_concat(walkerContext->outerJoinQualifierList, joinQualifierList);
}
}
else if (IsA(node, FromExpr))
@ -865,12 +912,13 @@ ExtractFromExpressionWalker(Node *node, List **qualifierList)
fromQualifierList = make_ands_implicit((Expr *) fromClause);
}
(*qualifierList) = list_concat(*qualifierList, fromQualifierList);
walkerContext->baseQualifierList =
list_concat(walkerContext->baseQualifierList, fromQualifierList);
}
}
walkerResult = expression_tree_walker(node, ExtractFromExpressionWalker,
(void *) qualifierList);
(void *) walkerContext);
return walkerResult;
}
@ -1867,8 +1915,8 @@ static MultiNode *
SubqueryPushdownMultiPlanTree(Query *queryTree, List *subqueryEntryList)
{
List *targetEntryList = queryTree->targetList;
List *whereClauseList = NIL;
List *whereClauseColumnList = NIL;
List *qualifierList = NIL;
List *qualifierColumnList = NIL;
List *targetListColumnList = NIL;
List *columnList = NIL;
ListCell *columnCell = NULL;
@ -1884,9 +1932,9 @@ SubqueryPushdownMultiPlanTree(Query *queryTree, List *subqueryEntryList)
ErrorIfQueryNotSupported(queryTree);
ErrorIfSubqueryJoin(queryTree);
/* extract where clause qualifiers and verify we can plan for them */
whereClauseList = WhereClauseList(queryTree->jointree);
ValidateClauseList(whereClauseList);
/* extract qualifiers and verify we can plan for them */
qualifierList = QualifierList(queryTree->jointree);
ValidateClauseList(qualifierList);
/*
* We disregard pulled subqueries. This changes order of range table list.
@ -1897,10 +1945,10 @@ SubqueryPushdownMultiPlanTree(Query *queryTree, List *subqueryEntryList)
*/
Assert(list_length(subqueryEntryList) == 1);
whereClauseColumnList = pull_var_clause_default((Node *) whereClauseList);
qualifierColumnList = pull_var_clause_default((Node *) qualifierList);
targetListColumnList = pull_var_clause_default((Node *) targetEntryList);
columnList = list_concat(whereClauseColumnList, targetListColumnList);
columnList = list_concat(qualifierColumnList, targetListColumnList);
foreach(columnCell, columnList)
{
Var *column = (Var *) lfirst(columnCell);
@ -1915,7 +1963,7 @@ SubqueryPushdownMultiPlanTree(Query *queryTree, List *subqueryEntryList)
currentTopNode = (MultiNode *) subqueryCollectNode;
/* build select node if the query has selection criteria */
selectNode = MultiSelectNode(whereClauseList);
selectNode = MultiSelectNode(qualifierList);
if (selectNode != NULL)
{
SetChild((MultiUnaryNode *) selectNode, currentTopNode);

View File

@ -198,6 +198,7 @@ extern bool IsJoinClause(Node *clause);
extern List * SubqueryEntryList(Query *queryTree);
extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList);
extern List * WhereClauseList(FromExpr *fromExpr);
extern List * QualifierList(FromExpr *fromExpr);
extern List * TableEntryList(List *rangeTableList);
extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList);
extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);

View File

@ -377,3 +377,541 @@ SELECT * FROM
AS foo;
DROP TABLE subquery_pruning_varchar_test_table;
-- Create composite type to use in subquery pushdown
CREATE TYPE user_composite_type AS
(
tenant_id BIGINT,
user_id BIGINT
);
\c - - - :worker_1_port
CREATE TYPE user_composite_type AS
(
tenant_id BIGINT,
user_id BIGINT
);
\c - - - :worker_2_port
CREATE TYPE user_composite_type AS
(
tenant_id BIGINT,
user_id BIGINT
);
\c - - - :master_port
CREATE TABLE events (
composite_id user_composite_type,
event_id bigint,
event_type character varying(255),
event_time bigint
);
SELECT master_create_distributed_table('events', 'composite_id', 'range');
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY events FROM STDIN WITH CSV
"(1,1001)",20001,click,1472807012
"(1,1001)",20002,submit,1472807015
"(1,1001)",20003,pay,1472807020
"(1,1002)",20010,click,1472807022
"(1,1002)",20011,click,1472807023
"(1,1002)",20012,submit,1472807025
"(1,1002)",20013,pay,1472807030
"(1,1003)",20014,click,1472807032
"(1,1003)",20015,click,1472807033
"(1,1003)",20016,click,1472807034
"(1,1003)",20017,submit,1472807035
\.
CREATE TABLE users (
composite_id user_composite_type,
lastseen bigint
);
SELECT master_create_distributed_table('users', 'composite_id', 'range');
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY users FROM STDIN WITH CSV
"(1,1001)",1472807115
"(1,1002)",1472807215
"(1,1003)",1472807315
\.
SET citus.subquery_pushdown TO TRUE;
-- Simple join subquery pushdown
SELECT
avg(array_length(events, 1)) AS event_average
FROM
(SELECT
tenant_id,
user_id,
array_agg(event_type ORDER BY event_time) AS events
FROM
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
event_type,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type IN ('click', 'submit', 'pay')) AS subquery
GROUP BY
tenant_id,
user_id) AS subquery;
-- Union and left join subquery pushdown
SELECT
avg(array_length(events, 1)) AS event_average,
hasdone
FROM
(SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(hasdone, 'Has not done paying') AS hasdone
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
(composite_id).tenant_id,
(composite_id).user_id,
'Has done paying'::TEXT AS hasdone
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay') AS subquery_2
ON
subquery_1.tenant_id = subquery_2.tenant_id AND
subquery_1.user_id = subquery_2.user_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
hasdone) AS subquery_top
GROUP BY
hasdone;
-- Union, left join and having subquery pushdown
SELECT
avg(array_length(events, 1)) AS event_average,
count_pay
FROM (
SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(count_pay, 0) AS count_pay
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
COUNT(*) AS count_pay
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay'
GROUP BY
tenant_id,
user_id
HAVING
COUNT(*) > 2) AS subquery_2
ON
subquery_1.tenant_id = subquery_2.tenant_id AND
subquery_1.user_id = subquery_2.user_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
count_pay) AS subquery_top
WHERE
array_ndims(events) > 0
GROUP BY
count_pay
ORDER BY
count_pay;
-- Lateral join subquery pushdown
SELECT
tenant_id,
user_id,
user_lastseen,
event_array
FROM
(SELECT
tenant_id,
user_id,
max(lastseen) as user_lastseen,
array_agg(event_type ORDER BY event_time) AS event_array
FROM
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
lastseen
FROM
users
WHERE
composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
composite_id <= '(1, 9223372036854775807)'::user_composite_type
ORDER BY
lastseen DESC
LIMIT
10
) AS subquery_top
LEFT JOIN LATERAL
(SELECT
event_type,
event_time
FROM
events
WHERE
(composite_id).tenant_id = subquery_top.tenant_id AND
(composite_id).user_id = subquery_top.user_id
ORDER BY
event_time DESC
LIMIT
99) AS subquery_lateral
ON
true
GROUP BY
tenant_id,
user_id
) AS shard_union
ORDER BY
user_lastseen DESC
LIMIT
10;
-- Same queries above with explain
SET client_min_messages TO DEBUG2;
-- Simple join subquery pushdown
EXPLAIN SELECT
avg(array_length(events, 1)) AS event_average
FROM
(SELECT
tenant_id,
user_id,
array_agg(event_type ORDER BY event_time) AS events
FROM
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
event_type,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type IN ('click', 'submit', 'pay')) AS subquery
GROUP BY
tenant_id,
user_id) AS subquery;
-- Union and left join subquery pushdown
EXPLAIN SELECT
avg(array_length(events, 1)) AS event_average,
hasdone
FROM
(SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(hasdone, 'Has not done paying') AS hasdone
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
(composite_id).tenant_id,
(composite_id).user_id,
'Has done paying'::TEXT AS hasdone
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay') AS subquery_2
ON
subquery_1.tenant_id = subquery_2.tenant_id AND
subquery_1.user_id = subquery_2.user_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
hasdone) AS subquery_top
GROUP BY
hasdone;
-- Union, left join and having subquery pushdown
EXPLAIN SELECT
avg(array_length(events, 1)) AS event_average,
count_pay
FROM (
SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(count_pay, 0) AS count_pay
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
COUNT(*) AS count_pay
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay'
GROUP BY
tenant_id,
user_id
HAVING
COUNT(*) > 2) AS subquery_2
ON
subquery_1.tenant_id = subquery_2.tenant_id AND
subquery_1.user_id = subquery_2.user_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
count_pay) AS subquery_top
WHERE
array_ndims(events) > 0
GROUP BY
count_pay
ORDER BY
count_pay;
-- Lateral join subquery pushdown
EXPLAIN SELECT
tenant_id,
user_id,
user_lastseen,
event_array
FROM
(SELECT
tenant_id,
user_id,
max(lastseen) as user_lastseen,
array_agg(event_type ORDER BY event_time) AS event_array
FROM
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
lastseen
FROM
users
WHERE
composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
composite_id <= '(1, 9223372036854775807)'::user_composite_type
ORDER BY
lastseen DESC
LIMIT
10
) AS subquery_top
LEFT JOIN LATERAL
(SELECT
event_type,
event_time
FROM
events
WHERE
(composite_id).tenant_id = subquery_top.tenant_id AND
(composite_id).user_id = subquery_top.user_id
ORDER BY
event_time DESC
LIMIT
99) AS subquery_lateral
ON
true
GROUP BY
tenant_id,
user_id
) AS shard_union
ORDER BY
user_lastseen DESC
LIMIT
10;
SET citusdb.task_executor_type TO 'real-time';
SET client_min_messages TO NOTICE;

View File

@ -421,3 +421,661 @@ AS foo;
(0 rows)
DROP TABLE subquery_pruning_varchar_test_table;
-- Create composite type to use in subquery pushdown
CREATE TYPE user_composite_type AS
(
tenant_id BIGINT,
user_id BIGINT
);
\c - - - :worker_1_port
CREATE TYPE user_composite_type AS
(
tenant_id BIGINT,
user_id BIGINT
);
\c - - - :worker_2_port
CREATE TYPE user_composite_type AS
(
tenant_id BIGINT,
user_id BIGINT
);
\c - - - :master_port
CREATE TABLE events (
composite_id user_composite_type,
event_id bigint,
event_type character varying(255),
event_time bigint
);
SELECT master_create_distributed_table('events', 'composite_id', 'range');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('events') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY events FROM STDIN WITH CSV
CREATE TABLE users (
composite_id user_composite_type,
lastseen bigint
);
SELECT master_create_distributed_table('users', 'composite_id', 'range');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)'
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('users') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)'
WHERE shardid = :new_shard_id;
\COPY users FROM STDIN WITH CSV
SET citus.subquery_pushdown TO TRUE;
-- Simple join subquery pushdown
SELECT
avg(array_length(events, 1)) AS event_average
FROM
(SELECT
tenant_id,
user_id,
array_agg(event_type ORDER BY event_time) AS events
FROM
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
event_type,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type IN ('click', 'submit', 'pay')) AS subquery
GROUP BY
tenant_id,
user_id) AS subquery;
event_average
--------------------
3.6666666666666667
(1 row)
-- Union and left join subquery pushdown
SELECT
avg(array_length(events, 1)) AS event_average,
hasdone
FROM
(SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(hasdone, 'Has not done paying') AS hasdone
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
(composite_id).tenant_id,
(composite_id).user_id,
'Has done paying'::TEXT AS hasdone
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay') AS subquery_2
ON
subquery_1.tenant_id = subquery_2.tenant_id AND
subquery_1.user_id = subquery_2.user_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
hasdone) AS subquery_top
GROUP BY
hasdone;
event_average | hasdone
--------------------+---------------------
4.0000000000000000 | Has not done paying
2.5000000000000000 | Has done paying
(2 rows)
-- Union, left join and having subquery pushdown
SELECT
avg(array_length(events, 1)) AS event_average,
count_pay
FROM (
SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(count_pay, 0) AS count_pay
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
COUNT(*) AS count_pay
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay'
GROUP BY
tenant_id,
user_id
HAVING
COUNT(*) > 2) AS subquery_2
ON
subquery_1.tenant_id = subquery_2.tenant_id AND
subquery_1.user_id = subquery_2.user_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
count_pay) AS subquery_top
WHERE
array_ndims(events) > 0
GROUP BY
count_pay
ORDER BY
count_pay;
event_average | count_pay
--------------------+-----------
3.0000000000000000 | 0
(1 row)
-- Lateral join subquery pushdown
SELECT
tenant_id,
user_id,
user_lastseen,
event_array
FROM
(SELECT
tenant_id,
user_id,
max(lastseen) as user_lastseen,
array_agg(event_type ORDER BY event_time) AS event_array
FROM
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
lastseen
FROM
users
WHERE
composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
composite_id <= '(1, 9223372036854775807)'::user_composite_type
ORDER BY
lastseen DESC
LIMIT
10
) AS subquery_top
LEFT JOIN LATERAL
(SELECT
event_type,
event_time
FROM
events
WHERE
(composite_id).tenant_id = subquery_top.tenant_id AND
(composite_id).user_id = subquery_top.user_id
ORDER BY
event_time DESC
LIMIT
99) AS subquery_lateral
ON
true
GROUP BY
tenant_id,
user_id
) AS shard_union
ORDER BY
user_lastseen DESC
LIMIT
10;
tenant_id | user_id | user_lastseen | event_array
-----------+---------+---------------+----------------------------
1 | 1003 | 1472807315 | {click,click,click,submit}
1 | 1002 | 1472807215 | {click,click,submit,pay}
1 | 1001 | 1472807115 | {click,submit,pay}
(3 rows)
-- Same queries above with explain
SET client_min_messages TO DEBUG2;
-- Simple join subquery pushdown
EXPLAIN SELECT
avg(array_length(events, 1)) AS event_average
FROM
(SELECT
tenant_id,
user_id,
array_agg(event_type ORDER BY event_time) AS events
FROM
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
event_type,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type IN ('click', 'submit', 'pay')) AS subquery
GROUP BY
tenant_id,
user_id) AS subquery;
DEBUG: predicate pruning for shardId 270015
DEBUG: predicate pruning for shardId 270016
DEBUG: predicate pruning for shardId 270011
DEBUG: predicate pruning for shardId 270012
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Distributed Query into pg_merge_job_270014
Executor: Real-Time
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate (cost=40.01..40.02 rows=1 width=32)
-> GroupAggregate (cost=39.89..39.99 rows=1 width=556)
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Merge Join (cost=39.89..39.97 rows=1 width=556)
Merge Cond: ((((users.composite_id).tenant_id) = ((events.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events.composite_id).user_id)))
-> Sort (cost=28.08..28.09 rows=6 width=32)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Sort (cost=11.81..11.82 rows=3 width=556)
Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id)
-> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556)
Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[]))
Master Query
-> Aggregate (cost=0.01..0.02 rows=1 width=0)
-> Seq Scan on pg_merge_job_270014 (cost=0.00..0.00 rows=0 width=0)
(22 rows)
-- Union and left join subquery pushdown
EXPLAIN SELECT
avg(array_length(events, 1)) AS event_average,
hasdone
FROM
(SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(hasdone, 'Has not done paying') AS hasdone
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
DISTINCT ON ((composite_id).tenant_id, (composite_id).user_id) composite_id,
(composite_id).tenant_id,
(composite_id).user_id,
'Has done paying'::TEXT AS hasdone
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay') AS subquery_2
ON
subquery_1.tenant_id = subquery_2.tenant_id AND
subquery_1.user_id = subquery_2.user_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
hasdone) AS subquery_top
GROUP BY
hasdone;
DEBUG: predicate pruning for shardId 270015
DEBUG: predicate pruning for shardId 270016
DEBUG: predicate pruning for shardId 270011
DEBUG: predicate pruning for shardId 270012
DEBUG: predicate pruning for shardId 270015
DEBUG: predicate pruning for shardId 270016
DEBUG: predicate pruning for shardId 270011
DEBUG: predicate pruning for shardId 270012
DEBUG: predicate pruning for shardId 270011
DEBUG: predicate pruning for shardId 270012
DEBUG: building index "pg_toast_17247_index" on table "pg_toast_17247"
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Distributed Query into pg_merge_job_270015
Executor: Real-Time
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate (cost=91.94..91.96 rows=2 width=64)
Group Key: COALESCE(('Has done paying'::text), 'Has not done paying'::text)
-> GroupAggregate (cost=91.85..91.90 rows=2 width=88)
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text)
-> Sort (cost=91.85..91.85 rows=2 width=88)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text)
-> Merge Left Join (cost=91.75..91.84 rows=2 width=88)
Merge Cond: ((((users.composite_id).tenant_id) = ((events_2.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events_2.composite_id).user_id)))
-> Unique (cost=79.46..79.48 rows=2 width=40)
-> Sort (cost=79.46..79.47 rows=2 width=40)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('action=>1'::text), events.event_time
-> Append (cost=0.00..79.45 rows=2 width=40)
-> Nested Loop (cost=0.00..39.72 rows=1 width=40)
Join Filter: (((users.composite_id).tenant_id = (events.composite_id).tenant_id) AND ((users.composite_id).user_id = (events.composite_id).user_id))
-> Seq Scan on events_270009 events (cost=0.00..11.62 rows=1 width=40)
Filter: ((event_type)::text = 'click'::text)
-> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Nested Loop (cost=0.00..39.72 rows=1 width=40)
Join Filter: (((users_1.composite_id).tenant_id = (events_1.composite_id).tenant_id) AND ((users_1.composite_id).user_id = (events_1.composite_id).user_id))
-> Seq Scan on events_270009 events_1 (cost=0.00..11.62 rows=1 width=40)
Filter: ((event_type)::text = 'submit'::text)
-> Seq Scan on users_270013 users_1 (cost=0.00..28.00 rows=6 width=32)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Materialize (cost=12.29..12.31 rows=1 width=48)
-> Unique (cost=12.29..12.30 rows=1 width=32)
-> Sort (cost=12.29..12.29 rows=1 width=32)
Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id)
-> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=32)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
Master Query
-> HashAggregate (cost=0.00..0.18 rows=10 width=0)
Group Key: intermediate_column_270015_2
-> Seq Scan on pg_merge_job_270015 (cost=0.00..0.00 rows=0 width=0)
(40 rows)
-- Union, left join and having subquery pushdown
EXPLAIN SELECT
avg(array_length(events, 1)) AS event_average,
count_pay
FROM (
SELECT
subquery_1.tenant_id,
subquery_1.user_id,
array_agg(event ORDER BY event_time) AS events,
COALESCE(count_pay, 0) AS count_pay
FROM
(
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>1'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'click')
UNION
(SELECT
(users.composite_id).tenant_id,
(users.composite_id).user_id,
'action=>2'AS event,
events.event_time
FROM
users,
events
WHERE
(users.composite_id).tenant_id = (events.composite_id).tenant_id AND
(users.composite_id).user_id = (events.composite_id).user_id AND
users.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
users.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'submit')
) AS subquery_1
LEFT JOIN
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
COUNT(*) AS count_pay
FROM
events
WHERE
events.composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
events.composite_id <= '(1, 9223372036854775807)'::user_composite_type AND
event_type = 'pay'
GROUP BY
tenant_id,
user_id
HAVING
COUNT(*) > 2) AS subquery_2
ON
subquery_1.tenant_id = subquery_2.tenant_id AND
subquery_1.user_id = subquery_2.user_id
GROUP BY
subquery_1.tenant_id,
subquery_1.user_id,
count_pay) AS subquery_top
WHERE
array_ndims(events) > 0
GROUP BY
count_pay
ORDER BY
count_pay;
DEBUG: predicate pruning for shardId 270015
DEBUG: predicate pruning for shardId 270016
DEBUG: predicate pruning for shardId 270011
DEBUG: predicate pruning for shardId 270012
DEBUG: predicate pruning for shardId 270015
DEBUG: predicate pruning for shardId 270016
DEBUG: predicate pruning for shardId 270011
DEBUG: predicate pruning for shardId 270012
DEBUG: predicate pruning for shardId 270011
DEBUG: predicate pruning for shardId 270012
ERROR: bogus varattno for OUTER_VAR var: 3
-- Lateral join subquery pushdown
EXPLAIN SELECT
tenant_id,
user_id,
user_lastseen,
event_array
FROM
(SELECT
tenant_id,
user_id,
max(lastseen) as user_lastseen,
array_agg(event_type ORDER BY event_time) AS event_array
FROM
(SELECT
(composite_id).tenant_id,
(composite_id).user_id,
lastseen
FROM
users
WHERE
composite_id >= '(1, -9223372036854775808)'::user_composite_type AND
composite_id <= '(1, 9223372036854775807)'::user_composite_type
ORDER BY
lastseen DESC
LIMIT
10
) AS subquery_top
LEFT JOIN LATERAL
(SELECT
event_type,
event_time
FROM
events
WHERE
(composite_id).tenant_id = subquery_top.tenant_id AND
(composite_id).user_id = subquery_top.user_id
ORDER BY
event_time DESC
LIMIT
99) AS subquery_lateral
ON
true
GROUP BY
tenant_id,
user_id
) AS shard_union
ORDER BY
user_lastseen DESC
LIMIT
10;
DEBUG: push down of limit count: 10
DEBUG: predicate pruning for shardId 270015
DEBUG: predicate pruning for shardId 270016
DEBUG: predicate pruning for shardId 270011
DEBUG: predicate pruning for shardId 270012
DEBUG: building index "pg_toast_17256_index" on table "pg_toast_17256"
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Distributed Query into pg_merge_job_270017
Executor: Real-Time
Task Count: 2
Tasks Shown: One of 2
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit (cost=100.43..100.44 rows=6 width=56)
-> Sort (cost=100.43..100.44 rows=6 width=56)
Sort Key: (max(users.lastseen)) DESC
-> GroupAggregate (cost=100.14..100.29 rows=6 width=548)
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Sort (cost=100.14..100.16 rows=6 width=548)
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Nested Loop Left Join (cost=40.04..100.06 rows=6 width=548)
-> Limit (cost=28.08..28.09 rows=6 width=40)
-> Sort (cost=28.08..28.09 rows=6 width=40)
Sort Key: users.lastseen DESC
-> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=40)
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Limit (cost=11.96..11.96 rows=1 width=524)
-> Sort (cost=11.96..11.96 rows=1 width=524)
Sort Key: events.event_time DESC
-> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524)
Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id)))
Master Query
-> Limit (cost=0.01..0.02 rows=0 width=0)
-> Sort (cost=0.01..0.02 rows=0 width=0)
Sort Key: intermediate_column_270017_2 DESC
-> Seq Scan on pg_merge_job_270017 (cost=0.00..0.00 rows=0 width=0)
(29 rows)
SET citusdb.task_executor_type TO 'real-time';
SET client_min_messages TO NOTICE;