Merge pull request #1668 from citusdata/window_function_preliminary_implementation

Add window function support for SUBQUERY PUSHDOWN and INSERT INTO SELECT
pull/1699/head
Mehmet Furkan ŞAHİN 2017-10-04 17:18:26 +03:00 committed by GitHub
commit e202c51fec
15 changed files with 4091 additions and 627 deletions

View File

@ -776,6 +776,7 @@ MultiTaskRouterSelectQuerySupported(Query *query)
{ {
List *queryList = NIL; List *queryList = NIL;
ListCell *queryCell = NULL; ListCell *queryCell = NULL;
StringInfo errorDetail = NULL;
ExtractQueryWalker((Node *) query, &queryList); ExtractQueryWalker((Node *) query, &queryList);
foreach(queryCell, queryList) foreach(queryCell, queryList)
@ -797,7 +798,7 @@ MultiTaskRouterSelectQuerySupported(Query *query)
if (subquery->limitCount != NULL) if (subquery->limitCount != NULL)
{ {
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"LIMIT clauses are not allowed in distirbuted INSERT " "LIMIT clauses are not allowed in distributed INSERT "
"... SELECT queries", "... SELECT queries",
NULL, NULL); NULL, NULL);
} }
@ -811,18 +812,35 @@ MultiTaskRouterSelectQuerySupported(Query *query)
NULL, NULL); NULL, NULL);
} }
/* /* group clause list must include partition column */
* We could potentially support window clauses where the data is partitioned if (subquery->groupClause)
* over distribution column. For simplicity, we currently do not support window {
* clauses at all. List *groupClauseList = subquery->groupClause;
*/ List *targetEntryList = subquery->targetList;
if (subquery->windowClause != NULL) List *groupTargetEntryList = GroupTargetEntryList(groupClauseList,
targetEntryList);
bool groupOnPartitionColumn = TargetListOnPartitionColumn(subquery,
groupTargetEntryList);
if (!groupOnPartitionColumn)
{ {
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"window functions are not allowed in distributed " "Group by list without distribution column is "
"INSERT ... SELECT queries", "not allowed in distributed INSERT ... "
"SELECT queries",
NULL, NULL); NULL, NULL);
} }
}
/*
* We support window functions when the window function
* is partitioned on distribution column.
*/
if (subquery->windowClause && !SafeToPushdownWindowFunction(subquery,
&errorDetail))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorDetail->data, NULL,
NULL);
}
if (subquery->setOperations != NULL) if (subquery->setOperations != NULL)
{ {

View File

@ -83,7 +83,7 @@ static DeferredErrorMessage * DeferErrorIfUnsupportedUnionQuery(Query *queryTree
outerMostQueryHasLimit); outerMostQueryHasLimit);
static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationList); static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationList);
static DeferredErrorMessage * DeferErrorIfUnsupportedTableCombination(Query *queryTree); static DeferredErrorMessage * DeferErrorIfUnsupportedTableCombination(Query *queryTree);
static bool TargetListOnPartitionColumn(Query *query, List *targetEntryList); static bool WindowPartitionOnDistributionColumn(Query *query);
static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query); static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query);
static bool FullCompositeFieldList(List *compositeFieldList); static bool FullCompositeFieldList(List *compositeFieldList);
static MultiNode * MultiPlanTree(Query *queryTree); static MultiNode * MultiPlanTree(Query *queryTree);
@ -447,6 +447,7 @@ MultiSubqueryPlanTree(Query *originalQuery, Query *queryTree,
* - Only a single RTE_RELATION exists, which means only a single table * - Only a single RTE_RELATION exists, which means only a single table
* name is specified on the whole query * name is specified on the whole query
* - No sublinks exists in the subquery * - No sublinks exists in the subquery
* - No window functions in the subquery
* *
* Note that the caller should still call DeferErrorIfUnsupportedSubqueryRepartition() * Note that the caller should still call DeferErrorIfUnsupportedSubqueryRepartition()
* to ensure that Citus supports the subquery. Also, this function is designed to run * to ensure that Citus supports the subquery. Also, this function is designed to run
@ -466,6 +467,12 @@ SingleRelationRepartitionSubquery(Query *queryTree)
return false; return false;
} }
/* we don't support window functions */
if (queryTree->hasWindowFuncs)
{
return false;
}
/* /*
* Don't allow joins and set operations. If join appears in the queryTree, the * Don't allow joins and set operations. If join appears in the queryTree, the
* length would be greater than 1. If only set operations exists, the length * length would be greater than 1. If only set operations exists, the length
@ -782,6 +789,7 @@ DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLi
{ {
bool preconditionsSatisfied = true; bool preconditionsSatisfied = true;
char *errorDetail = NULL; char *errorDetail = NULL;
StringInfo errorInfo = NULL;
DeferredErrorMessage *deferredError = NULL; DeferredErrorMessage *deferredError = NULL;
deferredError = DeferErrorIfUnsupportedTableCombination(subqueryTree); deferredError = DeferErrorIfUnsupportedTableCombination(subqueryTree);
@ -796,12 +804,6 @@ DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLi
errorDetail = "Subqueries without relations are unsupported"; errorDetail = "Subqueries without relations are unsupported";
} }
if (subqueryTree->hasWindowFuncs)
{
preconditionsSatisfied = false;
errorDetail = "Window functions are currently unsupported";
}
if (subqueryTree->limitOffset) if (subqueryTree->limitOffset)
{ {
preconditionsSatisfied = false; preconditionsSatisfied = false;
@ -871,6 +873,17 @@ DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLi
} }
} }
/*
* We support window functions when the window function
* is partitioned on distribution column.
*/
if (subqueryTree->windowClause && !SafeToPushdownWindowFunction(subqueryTree,
&errorInfo))
{
errorDetail = (char *) errorInfo->data;
preconditionsSatisfied = false;
}
/* we don't support aggregates without group by */ /* we don't support aggregates without group by */
if (subqueryTree->hasAggs && (subqueryTree->groupClause == NULL)) if (subqueryTree->hasAggs && (subqueryTree->groupClause == NULL))
{ {
@ -1083,11 +1096,91 @@ DeferErrorIfUnsupportedTableCombination(Query *queryTree)
} }
/*
* SafeToPushdownWindowFunction checks if the query with window function is supported.
* It returns the result accordingly and modifies the error detail.
*/
bool
SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail)
{
ListCell *windowClauseCell = NULL;
List *windowClauseList = query->windowClause;
/*
* We need to check each window clause separately if there is a partition by clause
* and if it is partitioned on the distribution column.
*/
foreach(windowClauseCell, windowClauseList)
{
WindowClause *windowClause = lfirst(windowClauseCell);
if (!windowClause->partitionClause)
{
*errorDetail = makeStringInfo();
appendStringInfoString(*errorDetail,
"Window functions without PARTITION BY on distribution "
"column is currently unsupported");
return false;
}
}
if (!WindowPartitionOnDistributionColumn(query))
{
*errorDetail = makeStringInfo();
appendStringInfoString(*errorDetail,
"Window functions with PARTITION BY list missing distribution "
"column is currently unsupported");
return false;
}
return true;
}
/*
* WindowPartitionOnDistributionColumn checks if the given subquery has one
* or more window functions and at least one of them is not partitioned by
* distribution column. The function returns false if your window function does not
* have a partition by clause or it does not include the distribution column.
*
* Please note that if the query does not have a window function, the function
* returns true.
*/
static bool
WindowPartitionOnDistributionColumn(Query *query)
{
List *windowClauseList = query->windowClause;
ListCell *windowClauseCell = NULL;
foreach(windowClauseCell, windowClauseList)
{
WindowClause *windowClause = lfirst(windowClauseCell);
List *groupTargetEntryList = NIL;
bool partitionOnDistributionColumn = false;
List *partitionClauseList = windowClause->partitionClause;
List *targetEntryList = query->targetList;
groupTargetEntryList =
GroupTargetEntryList(partitionClauseList, targetEntryList);
partitionOnDistributionColumn =
TargetListOnPartitionColumn(query, groupTargetEntryList);
if (!partitionOnDistributionColumn)
{
return false;
}
}
return true;
}
/* /*
* TargetListOnPartitionColumn checks if at least one target list entry is on * TargetListOnPartitionColumn checks if at least one target list entry is on
* partition column. * partition column.
*/ */
static bool bool
TargetListOnPartitionColumn(Query *query, List *targetEntryList) TargetListOnPartitionColumn(Query *query, List *targetEntryList)
{ {
bool targetListOnPartitionColumn = false; bool targetListOnPartitionColumn = false;

View File

@ -185,6 +185,8 @@ extern MultiTreeRoot * MultiLogicalPlanCreate(Query *originalQuery, Query *query
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext, plannerRestrictionContext,
ParamListInfo boundParams); ParamListInfo boundParams);
extern bool SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail);
extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList);
extern bool NeedsDistributedPlanning(Query *queryTree); extern bool NeedsDistributedPlanning(Query *queryTree);
extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ParentNode(MultiNode *multiNode);
extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode);

View File

@ -1015,8 +1015,7 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
raw_events_second raw_events_second
WHERE raw_events_first.user_id = raw_events_second.user_id WHERE raw_events_first.user_id = raw_events_second.user_id
GROUP BY raw_events_second.value_3) AS foo; GROUP BY raw_events_second.value_3) AS foo;
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: cannot push down this subquery ERROR: cannot push down this subquery
DETAIL: Group by list without partition column is currently unsupported DETAIL: Group by list without partition column is currently unsupported
@ -1133,9 +1132,7 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
GROUP BY raw_events_second.value_1 GROUP BY raw_events_second.value_1
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id); ON (f.id = f2.id);
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator. DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
@ -1164,8 +1161,10 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4,
GROUP BY raw_events_second.value_1 GROUP BY raw_events_second.value_1
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
ON (f.id = f2.id); ON (f.id = f2.id);
ERROR: cannot perform distributed planning for the given modification DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
DETAIL: Select query cannot be pushed down to the worker. DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- cannot pushdown the query since the JOIN is not equi JOIN -- cannot pushdown the query since the JOIN is not equi JOIN
INSERT INTO agg_events INSERT INTO agg_events
(user_id, value_4_agg) (user_id, value_4_agg)

View File

@ -0,0 +1,855 @@
-- ===================================================================
-- test insert select functionality for window functions
-- ===================================================================
TRUNCATE agg_results;
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS (PARTITION BY user_id ORDER BY time DESC)
) as foo;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
10001 | 101 | 49.5810418958104190
(1 row)
TRUNCATE agg_results;
-- the same test with different syntax
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER (PARTITION BY user_id ORDER BY time DESC) as rnk
FROM
events_table
) as foo;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
10001 | 101 | 49.5810418958104190
(1 row)
TRUNCATE agg_results;
-- similar test with lag
INSERT INTO agg_results (user_id, agg_time, value_2_agg, value_3_agg)
SELECT
user_id, time, lag_event_type, row_no
FROM
(
SELECT
*, lag(event_type) OVER my_win as lag_event_type, row_number() OVER my_win as row_no
FROM
events_table WINDOW my_win AS (PARTITION BY user_id ORDER BY time DESC)
) as foo;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
10001 | 101 | 49.5810418958104190
(1 row)
TRUNCATE agg_results;
-- simple window function, partitioned and grouped by on the distribution key
INSERT INTO agg_results (user_id, value_1_agg, value_2_agg)
SELECT
user_id, rnk, tme
FROM
(
SELECT
user_id, rank() OVER my_win as rnk, avg(value_2) as tme
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (PARTITION BY user_id ORDER BY avg(event_type) DESC)
) as foo;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
1188 | 101 | 49.7895622895622896
(1 row)
TRUNCATE agg_results;
-- top level query has a group by on the result of the window function
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
min(user_id), min(time), lag_event_type
FROM
(
SELECT
*, lag(event_type) OVER my_win as lag_event_type
FROM
events_table WINDOW my_win AS (PARTITION BY user_id ORDER BY time DESC)
) as foo
GROUP BY
lag_event_type;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+--------------------
1002 | 50 | 9.7844311377245509
(1 row)
TRUNCATE agg_results;
-- window functions should work along with joins as well
INSERT INTO agg_results (user_id, value_1_agg, value_2_agg)
SELECT * FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1, rank() OVER w1
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time)
) as foo;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
195 | 91 | 51.0205128205128205
(1 row)
TRUNCATE agg_results;
-- two window functions in a single subquery should work fine as well
INSERT INTO agg_results (user_id, value_1_agg, value_2_agg)
SELECT * FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1, rank() OVER w2
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (PARTITION BY users_table.user_id, (events_table.value_2 % 25) ORDER BY events_table.time)
) as foo;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
202 | 91 | 50.2970297029702970
(1 row)
TRUNCATE agg_results;
-- window functions should be fine within subquery joins
INSERT INTO agg_results (user_id, value_1_agg, value_2_agg, value_3_agg)
SELECT sub_1.user_id, max(lag_1), max(rank_1), max(rank_2) FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1 as lag_1, rank() OVER w2 as rank_1
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (PARTITION BY users_table.user_id, (events_table.value_2 % 25) ORDER BY events_table.time)
) as sub_1
JOIN
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1 as lag_2, rank() OVER w2 as rank_2
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.value_2 ORDER BY events_table.time),
w2 AS (PARTITION BY users_table.user_id, (events_table.value_2 % 50) ORDER BY events_table.time)
) as sub_2
ON(sub_1.user_id = sub_2.user_id)
GROUP BY
sub_1.user_id;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
91 | 91 | 50.2637362637362637
(1 row)
TRUNCATE agg_results;
-- GROUP BYs and PARTITION BYs should work fine together
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
avg(user_id), max(time), my_rank
FROM
(
SELECT
user_id, date_trunc('day', time) as time, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (PARTITION BY user_id ORDER BY count(*) DESC)
) as foo
WHERE
my_rank > 5
GROUP BY
my_rank;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
7 | 6 | 50.0000000000000000
(1 row)
TRUNCATE agg_results;
-- aggregates in the PARTITION BY is also allows
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
avg(user_id), max(time), my_rank
FROM
(
SELECT
user_id, date_trunc('day', time) as time, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (PARTITION BY user_id, avg(event_type%10)::int ORDER BY count(*) DESC)
) as foo
WHERE
my_rank > 0
GROUP BY
my_rank;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
8 | 7 | 48.8750000000000000
(1 row)
TRUNCATE agg_results;
-- GROUP BY should not necessarly be inclusive of partitioning
-- but this query doesn't make much sense
INSERT INTO agg_results (user_id, value_1_agg)
SELECT
avg(user_id), my_rank
FROM
(
SELECT
user_id, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id
WINDOW my_win AS (PARTITION BY user_id, max(event_type) ORDER BY count(*) DESC)
) as foo
GROUP BY
my_rank;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
1 | 1 | 50.0000000000000000
(1 row)
TRUNCATE agg_results;
-- Group by has more columns than partition by which uses coordinator insert ... select
INSERT INTO agg_results(user_id, value_2_agg)
SELECT * FROM (
SELECT
DISTINCT user_id, SUM(value_2) OVER (PARTITION BY user_id)
FROM
users_table
GROUP BY
user_id, value_1, value_2
) a
ORDER BY
2 DESC, 1
LIMIT
10;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
10 | 10 | 49.1000000000000000
(1 row)
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_2_agg)
SELECT user_id, max(sum) FROM (
SELECT
user_id, SUM(value_2) OVER (PARTITION BY user_id, value_1)
FROM
users_table
GROUP BY
user_id, value_1, value_2
) a
GROUP BY user_id;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
101 | 101 | 50.0000000000000000
(1 row)
TRUNCATE agg_results;
-- Subquery in where with window function
INSERT INTO agg_results(user_id)
SELECT
user_id
FROM
users_table
WHERE
value_2 > 545 AND
value_2 < ALL (
SELECT
avg(value_3) OVER (PARTITION BY user_id)
FROM
events_table
WHERE
users_table.user_id = events_table.user_id
)
GROUP BY
user_id;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
4 | 4 | 35.2500000000000000
(1 row)
TRUNCATE agg_results;
-- Partition by with aggregate functions. This query does not make much sense since the
-- result of aggregate function will be the same for every row in a partition and it is
-- not going to affect the group that the count function will work on.
INSERT INTO agg_results(user_id, value_2_agg)
SELECT * FROM (
SELECT
user_id, COUNT(*) OVER (PARTITION BY user_id, MIN(value_2))
FROM
users_table
GROUP BY
1
) a;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
101 | 101 | 50.0000000000000000
(1 row)
TRUNCATE agg_results;
-- Some more nested queries
INSERT INTO agg_results(user_id, value_2_agg, value_3_agg, value_4_agg)
SELECT
user_id, rank, SUM(ABS(value_2 - value_3)) AS difference, COUNT(*) AS distinct_users
FROM (
SELECT
*, rank() OVER (PARTITION BY user_id ORDER BY value_2 DESC)
FROM (
SELECT
user_id, value_2, sum(value_3) OVER (PARTITION BY user_id, value_2) as value_3
FROM users_table
) AS A
) AS A
GROUP BY
user_id, rank;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
9501 | 101 | 49.8461214608988528
(1 row)
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg)
SELECT * FROM (
SELECT DISTINCT
f3.user_id, ABS(f2.sum - f3.sum)
FROM (
SELECT DISTINCT
user_id, sum(value_3) OVER (PARTITION BY user_id)
FROM
users_table
GROUP BY
user_id, value_3
) f3,
(
SELECT DISTINCT
user_id, sum(value_2) OVER (PARTITION BY user_id)
FROM
users_table
GROUP BY
user_id, value_2
) f2
WHERE
f3.user_id=f2.user_id
) a;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
101 | 101 | 50.0000000000000000
(1 row)
TRUNCATE agg_results;
-- test with reference table partitioned on columns from both
INSERT INTO agg_results(user_id, value_1_agg)
SELECT *
FROM
(
SELECT
DISTINCT user_id, count(id) OVER (PARTITION BY user_id, id)
FROM
users_table, users_ref_test_table
) a;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
101 | 101 | 50.0000000000000000
(1 row)
TRUNCATE agg_results;
-- Window functions with HAVING clause
INSERT INTO agg_results (user_id, value_1_agg)
SELECT * FROM (
SELECT
DISTINCT user_id, rank() OVER (PARTITION BY user_id ORDER BY value_1)
FROM
users_table
GROUP BY
user_id, value_1 HAVING count(*) > 1
) a;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
437 | 100 | 49.9496567505720824
(1 row)
TRUNCATE agg_results;
-- Window functions with HAVING clause which uses coordinator insert ... select
INSERT INTO agg_results (user_id, value_1_agg)
SELECT * FROM (
SELECT
DISTINCT user_id, rank() OVER (PARTITION BY user_id ORDER BY value_1)
FROM
users_table
GROUP BY
user_id, value_1 HAVING count(*) > 1
) a
ORDER BY
2 DESC, 1
LIMIT
10;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
10 | 5 | 32.4000000000000000
(1 row)
TRUNCATE agg_results;
-- Window function in View works
CREATE VIEW view_with_window_func AS
SELECT
DISTINCT user_id, rank() OVER (PARTITION BY user_id ORDER BY value_1)
FROM
users_table
GROUP BY
user_id, value_1
HAVING count(*) > 1;
INSERT INTO agg_results(user_id, value_1_agg)
SELECT *
FROM
view_with_window_func;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
437 | 100 | 49.9496567505720824
(1 row)
TRUNCATE agg_results;
-- Window function in View works and the query uses coordinator insert ... select
INSERT INTO agg_results(user_id, value_1_agg)
SELECT *
FROM
view_with_window_func
LIMIT
10;
-- get some statistics from the aggregated results to ensure the results are correct
-- since there is a limit but not order, we cannot run avg(user_id)
SELECT count(*) FROM agg_results;
count
-------
10
(1 row)
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg)
SELECT
user_id, max(avg)
FROM
(
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (1, 2, 3, 4, 5))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (6, 7, 8, 9, 10))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (11, 12, 13, 14, 15))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (16, 17, 18, 19, 20))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (21, 22, 23, 24, 25))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (26, 27, 28, 29, 30))
) b
GROUP BY
user_id
LIMIT
5;
-- get some statistics from the aggregated results to ensure the results are correct
-- since there is a limit but not order, we cannot test avg or distinct count
SELECT count(*) FROM agg_results;
count
-------
5
(1 row)
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg)
SELECT
user_id, max(avg)
FROM
(
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (1, 2, 3, 4, 5))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (6, 7, 8, 9, 10))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (11, 12, 13, 14, 15))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (16, 17, 18, 19, 20))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (21, 22, 23, 24, 25))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (26, 27, 28, 29, 30))
) b
GROUP BY
user_id;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
94 | 94 | 50.4787234042553191
(1 row)
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg)
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_1
GROUP BY
user_id)
UNION
(SELECT
user_id, sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_2
GROUP BY
user_id
)
) AS ftop
LIMIT
5;
-- get some statistics from the aggregated results to ensure the results are correct
-- since there is a limit but not order, we cannot test avg or distinct count
SELECT count(*) FROM agg_results;
count
-------
5
(1 row)
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg)
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_1
GROUP BY
user_id)
UNION
(SELECT
user_id, sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_2
GROUP BY
user_id
)
) AS ftop;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
101 | 101 | 50.0000000000000000
(1 row)
TRUNCATE agg_results;
-- lets have some queries that Citus shouldn't push down
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS (PARTITION BY event_type ORDER BY time DESC)
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
ERROR: cannot push down this subquery
DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported
-- user needs to supply partition by which should
-- include the distribution key
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS ()
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
ERROR: cannot push down this subquery
DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported
-- user needs to supply partition by which should
-- include the distribution key
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS (ORDER BY time DESC)
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
ERROR: cannot push down this subquery
DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported
-- w2 should not be pushed down
INSERT INTO agg_results (user_id, value_1_agg, value_2_agg)
SELECT * FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1, rank() OVER w2
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (PARTITION BY users_table.user_id+1, (events_table.value_2 % 25) ORDER BY events_table.time)
) as foo
LIMIT
10;
ERROR: cannot push down this subquery
DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported
-- GROUP BY includes the partition key, but not the WINDOW function
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, my_rank
FROM
(
SELECT
user_id, date_trunc('day', time) as time, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (ORDER BY avg(event_type))
) as foo
WHERE
my_rank > 125;
ERROR: cannot push down this subquery
DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported
-- GROUP BY includes the partition key, but not the WINDOW function
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, my_rank
FROM
(
SELECT
user_id, date_trunc('day', time) as time, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (PARTITION BY date_trunc('day', time) ORDER BY avg(event_type))
) as foo
WHERE
my_rank > 125;
ERROR: cannot push down this subquery
DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported
-- w2 should not be allowed
INSERT INTO agg_results (user_id, value_2_agg, value_3_agg)
SELECT * FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1, rank() OVER w2
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (ORDER BY events_table.time)
) as foo;
ERROR: cannot push down this subquery
DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported
-- unsupported window function with an override
INSERT INTO agg_results(user_id, agg_time, value_2_agg)
SELECT * FROM (
SELECT
user_id, date_trunc('day', time) as time, sum(rank) OVER w2
FROM (
SELECT DISTINCT
user_id as user_id, time, rank() over w1
FROM
users_table
WINDOW
w AS (PARTITION BY time), w1 AS (w ORDER BY value_2, value_3)
) fab
WINDOW
w2 as (PARTITION BY user_id, time)
) a;
ERROR: cannot push down this subquery
DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported
-- Subquery in where with unsupported window function
INSERT INTO agg_results(user_id)
SELECT
user_id
FROM
users_table
WHERE
value_2 > 545 AND
value_2 < ALL (
SELECT
avg(value_3) OVER ()
FROM
events_table
WHERE
users_table.user_id = events_table.user_id
)
GROUP BY
user_id;
ERROR: cannot push down this subquery
DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported
-- Aggregate function on distribution column should error out
INSERT INTO agg_results(user_id, value_2_agg)
SELECT * FROM (
SELECT
user_id, COUNT(*) OVER (PARTITION BY sum(user_id), MIN(value_2))
FROM
users_table
GROUP BY
user_id
) a;
ERROR: cannot push down this subquery
DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported
-- UNION with only one subquery which has a partition on non-distribution column should
-- error out
INSERT INTO agg_results(user_id, value_1_agg)
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_1
GROUP BY
user_id)
UNION
(SELECT
user_id, sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by event_type) AS counter
FROM
events_table) user_id_2
GROUP BY
user_id
)
) AS ftop;
ERROR: cannot push down this subquery
DETAIL: Window functions with PARTITION BY list missing distribution column is currently unsupported
DROP VIEW view_with_window_func;

View File

@ -2381,19 +2381,6 @@ ORDER BY
types; types;
ERROR: cannot push down this subquery ERROR: cannot push down this subquery
DETAIL: Offset clause is currently unsupported DETAIL: Offset clause is currently unsupported
-- not supported due to window functions
SELECT user_id,
some_vals
FROM (
SELECT * ,
Row_number() over (PARTITION BY "user_id" ORDER BY "user_id") AS "some_vals",
Random()
FROM users_table
) user_id
ORDER BY 1,
2 limit 10;
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries without group by clause are not supported yet
-- not supported due to non relation rte -- not supported due to non relation rte
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM FROM

View File

@ -6,7 +6,6 @@
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests -- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000;
SET citus.enable_router_execution TO FALSE; SET citus.enable_router_execution TO FALSE;
CREATE TABLE user_buy_test_table(user_id int, item_id int, buy_count int); CREATE TABLE user_buy_test_table(user_id int, item_id int, buy_count int);
SELECT create_distributed_table('user_buy_test_table', 'user_id'); SELECT create_distributed_table('user_buy_test_table', 'user_id');
@ -29,19 +28,6 @@ SELECT create_distributed_table('users_return_test_table', 'user_id');
INSERT INTO users_return_test_table VALUES(4,1,1); INSERT INTO users_return_test_table VALUES(4,1,1);
INSERT INTO users_return_test_table VALUES(1,3,1); INSERT INTO users_return_test_table VALUES(1,3,1);
INSERT INTO users_return_test_table VALUES(3,2,2); INSERT INTO users_return_test_table VALUES(3,2,2);
CREATE TABLE users_ref_test_table(id int, it_name varchar(25), k_no int);
SELECT create_reference_table('users_ref_test_table');
create_reference_table
------------------------
(1 row)
INSERT INTO users_ref_test_table VALUES(1,'User_1',45);
INSERT INTO users_ref_test_table VALUES(2,'User_2',46);
INSERT INTO users_ref_test_table VALUES(3,'User_3',47);
INSERT INTO users_ref_test_table VALUES(4,'User_4',48);
INSERT INTO users_ref_test_table VALUES(5,'User_5',49);
INSERT INTO users_ref_test_table VALUES(6,'User_6',50);
-- Simple Join test with reference table -- Simple Join test with reference table
SELECT count(*) FROM SELECT count(*) FROM
(SELECT random() FROM user_buy_test_table JOIN users_ref_test_table (SELECT random() FROM user_buy_test_table JOIN users_ref_test_table

File diff suppressed because it is too large Load Diff

View File

@ -2,10 +2,8 @@
-- multi behavioral analytics -- multi behavioral analytics
-- this file is intended to create the table requires for the tests -- this file is intended to create the table requires for the tests
-- --
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000;
SET citus.shard_replication_factor = 1; SET citus.shard_replication_factor = 1;
SET citus.shard_count = 4; SET citus.shard_count = 4;
@ -30,6 +28,15 @@ SELECT create_distributed_table('agg_results_third', 'user_id');
CREATE TABLE agg_results_fourth (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp); CREATE TABLE agg_results_fourth (user_id int, value_1_agg int, value_2_agg int, value_3_agg float, value_4_agg bigint, agg_time timestamp);
SELECT create_distributed_table('agg_results_fourth', 'user_id'); SELECT create_distributed_table('agg_results_fourth', 'user_id');
CREATE TABLE users_ref_test_table(id int, it_name varchar(25), k_no int);
SELECT create_reference_table('users_ref_test_table');
INSERT INTO users_ref_test_table VALUES(1,'User_1',45);
INSERT INTO users_ref_test_table VALUES(2,'User_2',46);
INSERT INTO users_ref_test_table VALUES(3,'User_3',47);
INSERT INTO users_ref_test_table VALUES(4,'User_4',48);
INSERT INTO users_ref_test_table VALUES(5,'User_5',49);
INSERT INTO users_ref_test_table VALUES(6,'User_6',50);
COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV; COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV;
COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV; COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV;

View File

@ -36,7 +36,7 @@ test: multi_load_data
test: multi_behavioral_analytics_create_table test: multi_behavioral_analytics_create_table
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries
test: multi_insert_select test: multi_insert_select multi_insert_select_window
# --- # ---
# Tests for partitioning support # Tests for partitioning support
@ -51,7 +51,7 @@ test: multi_deparse_shard_query multi_distributed_transaction_id
test: multi_basic_queries multi_complex_expressions test: multi_basic_queries multi_complex_expressions
test: multi_explain test: multi_explain
test: multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics test: multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_complex_reference_clause test: multi_subquery_complex_reference_clause multi_subquery_window_functions
test: multi_subquery_in_where_reference_clause test: multi_subquery_in_where_reference_clause
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
test: multi_reference_table test: multi_reference_table

View File

@ -51,6 +51,19 @@ SELECT create_distributed_table('agg_results_fourth', 'user_id');
(1 row) (1 row)
CREATE TABLE users_ref_test_table(id int, it_name varchar(25), k_no int);
SELECT create_reference_table('users_ref_test_table');
create_reference_table
------------------------
(1 row)
INSERT INTO users_ref_test_table VALUES(1,'User_1',45);
INSERT INTO users_ref_test_table VALUES(2,'User_2',46);
INSERT INTO users_ref_test_table VALUES(3,'User_3',47);
INSERT INTO users_ref_test_table VALUES(4,'User_4',48);
INSERT INTO users_ref_test_table VALUES(5,'User_5',49);
INSERT INTO users_ref_test_table VALUES(6,'User_6',50);
COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV; COPY users_table FROM '@abs_srcdir@/data/users_table.data' WITH CSV;
COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV; COPY events_table FROM '@abs_srcdir@/data/events_table.data' WITH CSV;
-- create indexes for -- create indexes for

View File

@ -0,0 +1,769 @@
-- ===================================================================
-- test insert select functionality for window functions
-- ===================================================================
TRUNCATE agg_results;
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS (PARTITION BY user_id ORDER BY time DESC)
) as foo;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- the same test with different syntax
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER (PARTITION BY user_id ORDER BY time DESC) as rnk
FROM
events_table
) as foo;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- similar test with lag
INSERT INTO agg_results (user_id, agg_time, value_2_agg, value_3_agg)
SELECT
user_id, time, lag_event_type, row_no
FROM
(
SELECT
*, lag(event_type) OVER my_win as lag_event_type, row_number() OVER my_win as row_no
FROM
events_table WINDOW my_win AS (PARTITION BY user_id ORDER BY time DESC)
) as foo;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- simple window function, partitioned and grouped by on the distribution key
INSERT INTO agg_results (user_id, value_1_agg, value_2_agg)
SELECT
user_id, rnk, tme
FROM
(
SELECT
user_id, rank() OVER my_win as rnk, avg(value_2) as tme
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (PARTITION BY user_id ORDER BY avg(event_type) DESC)
) as foo;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- top level query has a group by on the result of the window function
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
min(user_id), min(time), lag_event_type
FROM
(
SELECT
*, lag(event_type) OVER my_win as lag_event_type
FROM
events_table WINDOW my_win AS (PARTITION BY user_id ORDER BY time DESC)
) as foo
GROUP BY
lag_event_type;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- window functions should work along with joins as well
INSERT INTO agg_results (user_id, value_1_agg, value_2_agg)
SELECT * FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1, rank() OVER w1
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time)
) as foo;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- two window functions in a single subquery should work fine as well
INSERT INTO agg_results (user_id, value_1_agg, value_2_agg)
SELECT * FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1, rank() OVER w2
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (PARTITION BY users_table.user_id, (events_table.value_2 % 25) ORDER BY events_table.time)
) as foo;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- window functions should be fine within subquery joins
INSERT INTO agg_results (user_id, value_1_agg, value_2_agg, value_3_agg)
SELECT sub_1.user_id, max(lag_1), max(rank_1), max(rank_2) FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1 as lag_1, rank() OVER w2 as rank_1
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (PARTITION BY users_table.user_id, (events_table.value_2 % 25) ORDER BY events_table.time)
) as sub_1
JOIN
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1 as lag_2, rank() OVER w2 as rank_2
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.value_2 ORDER BY events_table.time),
w2 AS (PARTITION BY users_table.user_id, (events_table.value_2 % 50) ORDER BY events_table.time)
) as sub_2
ON(sub_1.user_id = sub_2.user_id)
GROUP BY
sub_1.user_id;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- GROUP BYs and PARTITION BYs should work fine together
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
avg(user_id), max(time), my_rank
FROM
(
SELECT
user_id, date_trunc('day', time) as time, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (PARTITION BY user_id ORDER BY count(*) DESC)
) as foo
WHERE
my_rank > 5
GROUP BY
my_rank;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- aggregates in the PARTITION BY is also allows
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
avg(user_id), max(time), my_rank
FROM
(
SELECT
user_id, date_trunc('day', time) as time, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (PARTITION BY user_id, avg(event_type%10)::int ORDER BY count(*) DESC)
) as foo
WHERE
my_rank > 0
GROUP BY
my_rank;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- GROUP BY should not necessarly be inclusive of partitioning
-- but this query doesn't make much sense
INSERT INTO agg_results (user_id, value_1_agg)
SELECT
avg(user_id), my_rank
FROM
(
SELECT
user_id, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id
WINDOW my_win AS (PARTITION BY user_id, max(event_type) ORDER BY count(*) DESC)
) as foo
GROUP BY
my_rank;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- Group by has more columns than partition by which uses coordinator insert ... select
INSERT INTO agg_results(user_id, value_2_agg)
SELECT * FROM (
SELECT
DISTINCT user_id, SUM(value_2) OVER (PARTITION BY user_id)
FROM
users_table
GROUP BY
user_id, value_1, value_2
) a
ORDER BY
2 DESC, 1
LIMIT
10;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_2_agg)
SELECT user_id, max(sum) FROM (
SELECT
user_id, SUM(value_2) OVER (PARTITION BY user_id, value_1)
FROM
users_table
GROUP BY
user_id, value_1, value_2
) a
GROUP BY user_id;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- Subquery in where with window function
INSERT INTO agg_results(user_id)
SELECT
user_id
FROM
users_table
WHERE
value_2 > 545 AND
value_2 < ALL (
SELECT
avg(value_3) OVER (PARTITION BY user_id)
FROM
events_table
WHERE
users_table.user_id = events_table.user_id
)
GROUP BY
user_id;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- Partition by with aggregate functions. This query does not make much sense since the
-- result of aggregate function will be the same for every row in a partition and it is
-- not going to affect the group that the count function will work on.
INSERT INTO agg_results(user_id, value_2_agg)
SELECT * FROM (
SELECT
user_id, COUNT(*) OVER (PARTITION BY user_id, MIN(value_2))
FROM
users_table
GROUP BY
1
) a;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- Some more nested queries
INSERT INTO agg_results(user_id, value_2_agg, value_3_agg, value_4_agg)
SELECT
user_id, rank, SUM(ABS(value_2 - value_3)) AS difference, COUNT(*) AS distinct_users
FROM (
SELECT
*, rank() OVER (PARTITION BY user_id ORDER BY value_2 DESC)
FROM (
SELECT
user_id, value_2, sum(value_3) OVER (PARTITION BY user_id, value_2) as value_3
FROM users_table
) AS A
) AS A
GROUP BY
user_id, rank;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg)
SELECT * FROM (
SELECT DISTINCT
f3.user_id, ABS(f2.sum - f3.sum)
FROM (
SELECT DISTINCT
user_id, sum(value_3) OVER (PARTITION BY user_id)
FROM
users_table
GROUP BY
user_id, value_3
) f3,
(
SELECT DISTINCT
user_id, sum(value_2) OVER (PARTITION BY user_id)
FROM
users_table
GROUP BY
user_id, value_2
) f2
WHERE
f3.user_id=f2.user_id
) a;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- test with reference table partitioned on columns from both
INSERT INTO agg_results(user_id, value_1_agg)
SELECT *
FROM
(
SELECT
DISTINCT user_id, count(id) OVER (PARTITION BY user_id, id)
FROM
users_table, users_ref_test_table
) a;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- Window functions with HAVING clause
INSERT INTO agg_results (user_id, value_1_agg)
SELECT * FROM (
SELECT
DISTINCT user_id, rank() OVER (PARTITION BY user_id ORDER BY value_1)
FROM
users_table
GROUP BY
user_id, value_1 HAVING count(*) > 1
) a;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- Window functions with HAVING clause which uses coordinator insert ... select
INSERT INTO agg_results (user_id, value_1_agg)
SELECT * FROM (
SELECT
DISTINCT user_id, rank() OVER (PARTITION BY user_id ORDER BY value_1)
FROM
users_table
GROUP BY
user_id, value_1 HAVING count(*) > 1
) a
ORDER BY
2 DESC, 1
LIMIT
10;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- Window function in View works
CREATE VIEW view_with_window_func AS
SELECT
DISTINCT user_id, rank() OVER (PARTITION BY user_id ORDER BY value_1)
FROM
users_table
GROUP BY
user_id, value_1
HAVING count(*) > 1;
INSERT INTO agg_results(user_id, value_1_agg)
SELECT *
FROM
view_with_window_func;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- Window function in View works and the query uses coordinator insert ... select
INSERT INTO agg_results(user_id, value_1_agg)
SELECT *
FROM
view_with_window_func
LIMIT
10;
-- get some statistics from the aggregated results to ensure the results are correct
-- since there is a limit but not order, we cannot run avg(user_id)
SELECT count(*) FROM agg_results;
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg)
SELECT
user_id, max(avg)
FROM
(
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (1, 2, 3, 4, 5))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (6, 7, 8, 9, 10))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (11, 12, 13, 14, 15))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (16, 17, 18, 19, 20))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (21, 22, 23, 24, 25))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (26, 27, 28, 29, 30))
) b
GROUP BY
user_id
LIMIT
5;
-- get some statistics from the aggregated results to ensure the results are correct
-- since there is a limit but not order, we cannot test avg or distinct count
SELECT count(*) FROM agg_results;
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg)
SELECT
user_id, max(avg)
FROM
(
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (1, 2, 3, 4, 5))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (6, 7, 8, 9, 10))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (11, 12, 13, 14, 15))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (16, 17, 18, 19, 20))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (21, 22, 23, 24, 25))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (26, 27, 28, 29, 30))
) b
GROUP BY
user_id;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg)
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_1
GROUP BY
user_id)
UNION
(SELECT
user_id, sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_2
GROUP BY
user_id
)
) AS ftop
LIMIT
5;
-- get some statistics from the aggregated results to ensure the results are correct
-- since there is a limit but not order, we cannot test avg or distinct count
SELECT count(*) FROM agg_results;
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg)
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_1
GROUP BY
user_id)
UNION
(SELECT
user_id, sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_2
GROUP BY
user_id
)
) AS ftop;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
TRUNCATE agg_results;
-- lets have some queries that Citus shouldn't push down
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS (PARTITION BY event_type ORDER BY time DESC)
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
-- user needs to supply partition by which should
-- include the distribution key
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS ()
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
-- user needs to supply partition by which should
-- include the distribution key
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS (ORDER BY time DESC)
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
-- w2 should not be pushed down
INSERT INTO agg_results (user_id, value_1_agg, value_2_agg)
SELECT * FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1, rank() OVER w2
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (PARTITION BY users_table.user_id+1, (events_table.value_2 % 25) ORDER BY events_table.time)
) as foo
LIMIT
10;
-- GROUP BY includes the partition key, but not the WINDOW function
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, my_rank
FROM
(
SELECT
user_id, date_trunc('day', time) as time, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (ORDER BY avg(event_type))
) as foo
WHERE
my_rank > 125;
-- GROUP BY includes the partition key, but not the WINDOW function
INSERT INTO agg_results (user_id, agg_time, value_2_agg)
SELECT
user_id, time, my_rank
FROM
(
SELECT
user_id, date_trunc('day', time) as time, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (PARTITION BY date_trunc('day', time) ORDER BY avg(event_type))
) as foo
WHERE
my_rank > 125;
-- w2 should not be allowed
INSERT INTO agg_results (user_id, value_2_agg, value_3_agg)
SELECT * FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1, rank() OVER w2
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (ORDER BY events_table.time)
) as foo;
-- unsupported window function with an override
INSERT INTO agg_results(user_id, agg_time, value_2_agg)
SELECT * FROM (
SELECT
user_id, date_trunc('day', time) as time, sum(rank) OVER w2
FROM (
SELECT DISTINCT
user_id as user_id, time, rank() over w1
FROM
users_table
WINDOW
w AS (PARTITION BY time), w1 AS (w ORDER BY value_2, value_3)
) fab
WINDOW
w2 as (PARTITION BY user_id, time)
) a;
-- Subquery in where with unsupported window function
INSERT INTO agg_results(user_id)
SELECT
user_id
FROM
users_table
WHERE
value_2 > 545 AND
value_2 < ALL (
SELECT
avg(value_3) OVER ()
FROM
events_table
WHERE
users_table.user_id = events_table.user_id
)
GROUP BY
user_id;
-- Aggregate function on distribution column should error out
INSERT INTO agg_results(user_id, value_2_agg)
SELECT * FROM (
SELECT
user_id, COUNT(*) OVER (PARTITION BY sum(user_id), MIN(value_2))
FROM
users_table
GROUP BY
user_id
) a;
-- UNION with only one subquery which has a partition on non-distribution column should
-- error out
INSERT INTO agg_results(user_id, value_1_agg)
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_1
GROUP BY
user_id)
UNION
(SELECT
user_id, sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by event_type) AS counter
FROM
events_table) user_id_2
GROUP BY
user_id
)
) AS ftop;
DROP VIEW view_with_window_func;

View File

@ -2160,18 +2160,6 @@ GROUP BY
ORDER BY ORDER BY
types; types;
-- not supported due to window functions
SELECT user_id,
some_vals
FROM (
SELECT * ,
Row_number() over (PARTITION BY "user_id" ORDER BY "user_id") AS "some_vals",
Random()
FROM users_table
) user_id
ORDER BY 1,
2 limit 10;
-- not supported due to non relation rte -- not supported due to non relation rte
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM FROM

View File

@ -23,15 +23,6 @@ INSERT INTO users_return_test_table VALUES(4,1,1);
INSERT INTO users_return_test_table VALUES(1,3,1); INSERT INTO users_return_test_table VALUES(1,3,1);
INSERT INTO users_return_test_table VALUES(3,2,2); INSERT INTO users_return_test_table VALUES(3,2,2);
CREATE TABLE users_ref_test_table(id int, it_name varchar(25), k_no int);
SELECT create_reference_table('users_ref_test_table');
INSERT INTO users_ref_test_table VALUES(1,'User_1',45);
INSERT INTO users_ref_test_table VALUES(2,'User_2',46);
INSERT INTO users_ref_test_table VALUES(3,'User_3',47);
INSERT INTO users_ref_test_table VALUES(4,'User_4',48);
INSERT INTO users_ref_test_table VALUES(5,'User_5',49);
INSERT INTO users_ref_test_table VALUES(6,'User_6',50);
-- Simple Join test with reference table -- Simple Join test with reference table
SELECT count(*) FROM SELECT count(*) FROM
(SELECT random() FROM user_buy_test_table JOIN users_ref_test_table (SELECT random() FROM user_buy_test_table JOIN users_ref_test_table

View File

@ -0,0 +1,706 @@
-- ===================================================================
-- test multi subquery functionality for window functions
-- ===================================================================
CREATE VIEW subq AS
SELECT
DISTINCT user_id, rank() OVER (PARTITION BY user_id ORDER BY value_1)
FROM
users_table
GROUP BY
user_id, value_1
HAVING count(*) > 1;
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS (PARTITION BY user_id ORDER BY time DESC)
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
-- the same test with different syntax
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER (PARTITION BY user_id ORDER BY time DESC) as rnk
FROM
events_table
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
-- similar test with lag
SELECT
user_id, time, lag_event_type, row_no
FROM
(
SELECT
*, lag(event_type) OVER my_win as lag_event_type, row_number() OVER my_win as row_no
FROM
events_table WINDOW my_win AS (PARTITION BY user_id ORDER BY time DESC)
) as foo
ORDER BY
4 DESC, 3 DESC NULLS LAST, 1 DESC, 2 DESC
LIMIT
10;
-- simple window function, partitioned and grouped by on the distribution key
SELECT
user_id, rnk, avg_val_2
FROM
(
SELECT
user_id, rank() OVER my_win as rnk, avg(value_2) as avg_val_2
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (PARTITION BY user_id ORDER BY avg(event_type) DESC)
) as foo
ORDER BY
2 DESC, 1 DESC, 3 DESC
LIMIT
10;
-- top level query has a group by on the result of the window function
SELECT
min(user_id), min(time), lag_event_type, count(*)
FROM
(
SELECT
*, lag(event_type) OVER my_win as lag_event_type
FROM
events_table WINDOW my_win AS (PARTITION BY user_id ORDER BY time DESC)
) as foo
GROUP BY
lag_event_type
ORDER BY
3 DESC NULLS LAST, 1 DESC, 2 DESC
LIMIT
10;
-- window functions should work along with joins as well
SELECT * FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1, rank() OVER w1
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time)
) as foo
ORDER BY 3 DESC, 1 DESC, 2 DESC NULLS LAST
LIMIT 10;
-- two window functions in a single subquery should work fine as well
SELECT * FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1, rank() OVER w2
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (PARTITION BY users_table.user_id, (events_table.value_2 % 25) ORDER BY events_table.time)
) as foo
ORDER BY 3 DESC, 1 DESC, 2 DESC NULLS LAST
LIMIT 10;
-- window functions should be fine within subquery joins
SELECT sub_1.user_id, max(lag_1), max(rank_1), max(rank_2) FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1 as lag_1, rank() OVER w2 as rank_1
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (PARTITION BY users_table.user_id, (events_table.value_2 % 25) ORDER BY events_table.time)
) as sub_1
JOIN
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1 as lag_2, rank() OVER w2 as rank_2
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.value_2 ORDER BY events_table.time),
w2 AS (PARTITION BY users_table.user_id, (events_table.value_2 % 50) ORDER BY events_table.time)
) as sub_2
ON(sub_1.user_id = sub_2.user_id)
GROUP BY
sub_1.user_id
ORDER BY 3 DESC, 4 DESC, 1 DESC, 2 DESC NULLS LAST
LIMIT 10;
-- GROUP BYs and PARTITION BYs should work fine together
SELECT
avg(user_id), max(time), my_rank
FROM
(
SELECT
user_id, date_trunc('day', time) as time, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (PARTITION BY user_id ORDER BY count(*) DESC)
) as foo
WHERE
my_rank > 5
GROUP BY
my_rank
ORDER BY
3 DESC, 1 DESC,2 DESC
LIMIT
10;
-- aggregates in the PARTITION BY is also allows
SELECT
avg(user_id), max(time), my_rank
FROM
(
SELECT
user_id, date_trunc('day', time) as time, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (PARTITION BY user_id, avg(event_type%10)::int ORDER BY count(*) DESC)
) as foo
WHERE
my_rank > 0
GROUP BY
my_rank
ORDER BY
3 DESC, 1 DESC,2 DESC
LIMIT
10;
-- GROUP BY should not necessarly be inclusive of partitioning
-- but this query doesn't make much sense
SELECT
avg(user_id), my_rank
FROM
(
SELECT
user_id, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id
WINDOW my_win AS (PARTITION BY user_id, max(event_type) ORDER BY count(*) DESC)
) as foo
GROUP BY
my_rank
ORDER BY
2 DESC, 1 DESC
LIMIT
10;
-- Using previously defined supported window function on distribution key
SELECT * FROM (
SELECT
user_id, date_trunc('day', time) as time, sum(rank) OVER w2
FROM (
SELECT DISTINCT
user_id as user_id, time, rank() over w1
FROM users_table
WINDOW
w AS (PARTITION BY user_id),
w1 AS (w ORDER BY value_2, value_3)
) fab
WINDOW
w2 as (PARTITION BY user_id, time)
) a
ORDER BY
1, 2, 3 DESC
LIMIT
10;
-- test with reference table partitioned on columns from both
SELECT *
FROM
(
SELECT
DISTINCT user_id, it_name, count(id) OVER (PARTITION BY user_id, id)
FROM
users_table, users_ref_test_table
WHERE users_table.value_2=users_ref_test_table.k_no
) a
ORDER BY
1, 2, 3
LIMIT
20;
-- Group by has more columns than partition by
SELECT * FROM (
SELECT
DISTINCT user_id, SUM(value_2) OVER (PARTITION BY user_id)
FROM
users_table
GROUP BY
user_id, value_1, value_2
) a
ORDER BY
2 DESC, 1
LIMIT
10;
SELECT user_id, max(sum) FROM (
SELECT
user_id, SUM(value_2) OVER (PARTITION BY user_id, value_1)
FROM
users_table
GROUP BY
user_id, value_1, value_2
) a
GROUP BY user_id ORDER BY
2 DESC,1
LIMIT
10;
-- Window functions with HAVING clause
SELECT * FROM (
SELECT
DISTINCT user_id, rank() OVER (PARTITION BY user_id ORDER BY value_1)
FROM
users_table
GROUP BY
user_id, value_1 HAVING count(*) > 1
) a
ORDER BY
2 DESC, 1
LIMIT
10;
-- Window function in View works
SELECT *
FROM
subq
ORDER BY
2 DESC, 1
LIMIT
10;
-- Window functions with UNION/UNION ALL works
SELECT
max(avg)
FROM
(
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (1, 2, 3, 4, 5))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (6, 7, 8, 9, 10))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (11, 12, 13, 14, 15))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (16, 17, 18, 19, 20))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (21, 22, 23, 24, 25))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (26, 27, 28, 29, 30))
) b
GROUP BY user_id
ORDER BY 1 DESC
LIMIT 5;
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_1
GROUP BY
user_id)
UNION
(SELECT
user_id, sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_2
GROUP BY
user_id)) AS ftop
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
-- Subquery in where with window function
SELECT
user_id
FROM
users_table
WHERE
value_2 > 545 AND
value_2 < ALL (
SELECT
avg(value_3) OVER (PARTITION BY user_id)
FROM
events_table
WHERE
users_table.user_id = events_table.user_id
)
GROUP BY
user_id
ORDER BY
user_id DESC
LIMIT
3;
-- Some more nested queries
SELECT
user_id, rank, SUM(ABS(value_2 - value_3)) AS difference, COUNT(*) AS distinct_users
FROM (
SELECT
*, rank() OVER (PARTITION BY user_id ORDER BY value_2 DESC)
FROM (
SELECT
user_id, value_2, sum(value_3) OVER (PARTITION BY user_id, value_2) as value_3
FROM users_table
) AS A
) AS A
GROUP BY
user_id, rank
ORDER BY
difference DESC, rank DESC
LIMIT 20;
SELECT * FROM (
SELECT DISTINCT
f3.user_id, ABS(f2.sum - f3.sum)
FROM (
SELECT DISTINCT
user_id, sum(value_3) OVER (PARTITION BY user_id)
FROM
users_table
GROUP BY
user_id, value_3
) f3,
(
SELECT DISTINCT
user_id, sum(value_2) OVER (PARTITION BY user_id)
FROM
users_table
GROUP BY
user_id, value_2
) f2
WHERE
f3.user_id=f2.user_id
) a
ORDER BY
abs DESC
LIMIT 10;
-- Partition by with aggregate functions. This query does not make much sense since the
-- result of aggregate function will be the same for every row in a partition and it is
-- not going to affect the group that the count function will work on.
SELECT * FROM (
SELECT
user_id, COUNT(*) OVER (PARTITION BY user_id, MIN(value_2))
FROM
users_table
GROUP BY
1
) a
ORDER BY
1 DESC
LIMIT
5;
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_1
GROUP BY
user_id)
UNION
(SELECT
user_id, sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_2
GROUP BY
user_id)) AS ftop
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
-- lets have some queries that Citus shouldn't push down
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS (PARTITION BY event_type ORDER BY time DESC)
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
-- user needs to supply partition by which should
-- include the distribution key
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS ()
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
-- user needs to supply partition by which should
-- include the distribution key
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS (ORDER BY time DESC)
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
-- w2 should not be pushed down
SELECT * FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1, rank() OVER w2
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (PARTITION BY users_table.user_id+1, (events_table.value_2 % 25) ORDER BY events_table.time)
) as foo
ORDER BY 3 DESC, 1 DESC, 2 DESC NULLS LAST
LIMIT 10;
-- w2 should not be pushed down
SELECT * FROM
(
SELECT
DISTINCT users_table.user_id, lag(users_table.user_id) OVER w1, rank() OVER w2
FROM
users_table, events_table
WHERE
users_table.user_id = events_table.user_id and
event_type < 25
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (ORDER BY events_table.time)
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC NULLS LAST
LIMIT
10;
-- GROUP BY includes the partition key, but not the WINDOW function
SELECT
user_id, time, my_rank
FROM
(
SELECT
user_id, date_trunc('day', time) as time, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (ORDER BY avg(event_type))
) as foo
WHERE
my_rank > 125
ORDER BY
3 DESC, 1 DESC,2 DESC
LIMIT
10;
-- GROUP BY includes the partition key, but not the WINDOW function
SELECT
user_id, time, my_rank
FROM
(
SELECT
user_id, date_trunc('day', time) as time, rank() OVER my_win as my_rank
FROM
events_table
GROUP BY
user_id, date_trunc('day', time)
WINDOW my_win AS (PARTITION BY date_trunc('day', time) ORDER BY avg(event_type))
) as foo
WHERE
my_rank > 125
ORDER BY
3 DESC, 1 DESC,2 DESC
LIMIT
10;
-- Overriding window function but not supported
SELECT * FROM (
SELECT
user_id, date_trunc('day', time) as time, sum(rank) OVER w2
FROM (
SELECT DISTINCT
user_id as user_id, time, rank() over w1
FROM
users_table
WINDOW
w AS (PARTITION BY time), w1 AS (w ORDER BY value_2, value_3)
) fab
WINDOW
w2 as (PARTITION BY user_id, time)
) a
ORDER BY
1,2,3;
-- Aggregate function on distribution column should error out
SELECT * FROM (
SELECT
user_id, COUNT(*) OVER (PARTITION BY sum(user_id), MIN(value_2))
FROM
users_table
GROUP BY
user_id
) a
ORDER BY
1 DESC, 2 DESC;
-- test with reference table partitioned on only a column from reference table
SELECT *
FROM
(
SELECT
DISTINCT user_id, it_name, count(id) OVER (PARTITION BY id)
FROM
users_table, users_ref_test_table
) a
ORDER BY
1, 2, 3
LIMIT
20;
-- UNION ALL with only one of them is not partitioned over distribution column which
-- should not be allowed.
SELECT
max(avg)
FROM
(
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (1, 2, 3, 4, 5))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (6, 7, 8, 9, 10))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (11, 12, 13, 14, 15))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (16, 17, 18, 19, 20))
UNION ALL
(SELECT avg(value_3) over (partition by user_id), user_id FROM events_table where event_type IN (21, 22, 23, 24, 25))
UNION ALL
(SELECT avg(value_3) over (partition by event_type), user_id FROM events_table where event_type IN (26, 27, 28, 29, 30))
) b
GROUP BY user_id
ORDER BY 1 DESC
LIMIT 5;
-- UNION with only one subquery which has a partition on non-distribution column should
-- error out
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
events_table) user_id_1
GROUP BY
user_id)
UNION
(SELECT
user_id, sum(counter)
FROM
(SELECT
user_id, sum(value_2) over (partition by user_id) AS counter
FROM
users_table
UNION
SELECT
user_id, sum(value_2) over (partition by event_type) AS counter
FROM
events_table) user_id_2
GROUP BY
user_id)) AS ftop
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
DROP VIEW subq;