Add support for window functions on coordinator

Some refactoring:
Consolidate expression which decides whether GROUP BY/HAVING are pushed down
Rename early pullUpIntermediateRows to hasNonDistributableAggregates
Create WorkerColumnName to handle formatting WORKER_COLUMN_FORMAT
Ignore NULL StringInfo pointers to SafeToPushdownWindowFunction
Fix bug where SubqueryPushdownMultiNodeTree mutates supplied Query,
	SafeToPushdownWindowFunction requires the original query as it relies on rtable
pull/3537/head
Philip Dubé 2020-02-21 20:38:49 +00:00
parent 36ff150465
commit 720525cfda
42 changed files with 2159 additions and 911 deletions

View File

@ -37,7 +37,7 @@
* same connection since it may hold relevant locks or have uncommitted
* writes. In that case we "assign" the task to a connection by adding
* it to the task queue of specific connection (in
* AssignTasksToConnectionsOrWorkerPool ). Otherwise we consider the task
* AssignTasksToConnectionsOrWorkerPool). Otherwise we consider the task
* unassigned and add it to the task queue of a worker pool, which means
* that it can be executed over any connection in the pool.
*

View File

@ -33,7 +33,12 @@ static bool HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *having
static bool PartitionColumnInTableList(Var *column, List *tableNodeList);
static bool ShouldPullDistinctColumn(bool repartitionSubquery,
bool groupedByDisjointPartitionColumn,
bool hasNonPartitionColumnDistinctAgg);
bool hasNonPartitionColumnDistinctAgg,
bool onlyPushableWindowFunctions);
static bool CanPushDownGroupingAndHaving(bool pullUpIntermediateRows,
bool groupedByDisjointPartitionColumn,
bool hasWindowFuncs,
bool onlyPushableWindowFunctions);
/*
@ -42,8 +47,8 @@ static bool ShouldPullDistinctColumn(bool repartitionSubquery,
* value should be used in a read-only manner.
*/
ExtendedOpNodeProperties
BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode, bool
pullUpIntermediateRows)
BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode,
bool hasNonDistributableAggregates)
{
ExtendedOpNodeProperties extendedOpNodeProperties;
@ -51,6 +56,9 @@ BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode, bool
bool groupedByDisjointPartitionColumn =
GroupedByPartitionColumn((MultiNode *) extendedOpNode, extendedOpNode);
bool pullUpIntermediateRows = !groupedByDisjointPartitionColumn &&
hasNonDistributableAggregates;
bool repartitionSubquery = ExtendedOpNodeContainsRepartitionSubquery(extendedOpNode);
List *targetList = extendedOpNode->targetList;
@ -58,16 +66,17 @@ BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode, bool
bool hasNonPartitionColumnDistinctAgg =
HasNonPartitionColumnDistinctAgg(targetList, havingQual, tableNodeList);
bool pullDistinctColumns =
ShouldPullDistinctColumn(repartitionSubquery, groupedByDisjointPartitionColumn,
hasNonPartitionColumnDistinctAgg);
bool pushDownGroupingAndHaving =
CanPushDownGroupingAndHaving(pullUpIntermediateRows,
groupedByDisjointPartitionColumn,
extendedOpNode->hasWindowFuncs,
extendedOpNode->onlyPushableWindowFunctions);
/*
* TODO: Only window functions that can be pushed down reach here, thus,
* using hasWindowFuncs is safe for now. However, this should be fixed
* when we support pull-to-master window functions.
*/
bool pushDownWindowFunctions = extendedOpNode->hasWindowFuncs;
bool pullDistinctColumns =
ShouldPullDistinctColumn(repartitionSubquery,
groupedByDisjointPartitionColumn,
hasNonPartitionColumnDistinctAgg,
extendedOpNode->onlyPushableWindowFunctions);
extendedOpNodeProperties.groupedByDisjointPartitionColumn =
groupedByDisjointPartitionColumn;
@ -75,9 +84,11 @@ BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode, bool
extendedOpNodeProperties.hasNonPartitionColumnDistinctAgg =
hasNonPartitionColumnDistinctAgg;
extendedOpNodeProperties.pullDistinctColumns = pullDistinctColumns;
extendedOpNodeProperties.pushDownWindowFunctions = pushDownWindowFunctions;
extendedOpNodeProperties.pullUpIntermediateRows =
!groupedByDisjointPartitionColumn && pullUpIntermediateRows;
extendedOpNodeProperties.pullUpIntermediateRows = pullUpIntermediateRows;
extendedOpNodeProperties.hasWindowFuncs = extendedOpNode->hasWindowFuncs;
extendedOpNodeProperties.onlyPushableWindowFunctions =
extendedOpNode->onlyPushableWindowFunctions;
extendedOpNodeProperties.pushDownGroupingAndHaving = pushDownGroupingAndHaving;
return extendedOpNodeProperties;
}
@ -309,21 +320,52 @@ PartitionColumnInTableList(Var *column, List *tableNodeList)
static bool
ShouldPullDistinctColumn(bool repartitionSubquery,
bool groupedByDisjointPartitionColumn,
bool hasNonPartitionColumnDistinctAgg)
bool hasNonPartitionColumnDistinctAgg,
bool onlyPushableWindowFunctions)
{
if (repartitionSubquery)
{
return true;
}
if (groupedByDisjointPartitionColumn)
/* don't pull distinct columns when it can be pushed down */
if (onlyPushableWindowFunctions && groupedByDisjointPartitionColumn)
{
return false;
}
else if (!groupedByDisjointPartitionColumn && hasNonPartitionColumnDistinctAgg)
else if (hasNonPartitionColumnDistinctAgg)
{
return true;
}
return false;
}
/*
* CanPushDownGroupingAndHaving returns whether GROUP BY & HAVING should be
* pushed down to worker.
*/
static bool
CanPushDownGroupingAndHaving(bool pullUpIntermediateRows,
bool groupedByDisjointPartitionColumn,
bool hasWindowFuncs, bool onlyPushableWindowFunctions)
{
/* don't push down if we're pulling up */
if (pullUpIntermediateRows)
{
return false;
}
/*
* If grouped by a partition column we can push down the having qualifier.
*
* When a query with subquery is provided, we can't determine if
* groupedByDisjointPartitionColumn, therefore we also check if there is a
* window function too. If there is a window function we would know that it
* is safe to push down (i.e. it is partitioned on distribution column, and
* if there is a group by, it contains distribution column).
*/
return groupedByDisjointPartitionColumn ||
(hasWindowFuncs && onlyPushableWindowFunctions);
}

File diff suppressed because it is too large Load Diff

View File

@ -206,7 +206,7 @@ FindNodeCheck(Node *node, bool (*check)(Node *))
* - Only a single RTE_RELATION exists, which means only a single table
* name is specified on the whole query
* - No sublinks exists in the subquery
* - No window functions in the subquery
* - No window functions exists in the subquery
*
* Note that the caller should still call DeferErrorIfUnsupportedSubqueryRepartition()
* to ensure that Citus supports the subquery. Also, this function is designed to run
@ -777,7 +777,7 @@ MultiNodeTree(Query *queryTree)
* distinguish between aggregates and expressions; and we address this later
* in the logical optimizer.
*/
MultiExtendedOp *extendedOpNode = MultiExtendedOpNode(queryTree);
MultiExtendedOp *extendedOpNode = MultiExtendedOpNode(queryTree, queryTree);
SetChild((MultiUnaryNode *) extendedOpNode, currentTopNode);
currentTopNode = (MultiNode *) extendedOpNode;
@ -923,7 +923,6 @@ DeferErrorIfQueryNotSupported(Query *queryTree)
{
char *errorMessage = NULL;
bool preconditionsSatisfied = true;
StringInfo errorInfo = NULL;
const char *errorHint = NULL;
const char *joinHint = "Consider joining tables on partition column and have "
"equal filter on joining columns.";
@ -942,18 +941,6 @@ DeferErrorIfQueryNotSupported(Query *queryTree)
errorHint = filterHint;
}
if (queryTree->hasWindowFuncs &&
!SafeToPushdownWindowFunction(queryTree, &errorInfo))
{
preconditionsSatisfied = false;
errorMessage = "could not run distributed query because the window "
"function that is used cannot be pushed down";
errorHint = "Window functions are supported in two ways. Either add "
"an equality filter on the distributed tables' partition "
"column or use the window functions with a PARTITION BY "
"clause containing the distribution column";
}
if (queryTree->setOperations)
{
preconditionsSatisfied = false;
@ -1826,7 +1813,7 @@ MultiProjectNode(List *targetEntryList)
/* Builds the extended operator node using fields from the given query tree. */
MultiExtendedOp *
MultiExtendedOpNode(Query *queryTree)
MultiExtendedOpNode(Query *queryTree, Query *originalQuery)
{
MultiExtendedOp *extendedOpNode = CitusMakeNode(MultiExtendedOp);
extendedOpNode->targetList = queryTree->targetList;
@ -1839,6 +1826,9 @@ MultiExtendedOpNode(Query *queryTree)
extendedOpNode->hasDistinctOn = queryTree->hasDistinctOn;
extendedOpNode->hasWindowFuncs = queryTree->hasWindowFuncs;
extendedOpNode->windowClause = queryTree->windowClause;
extendedOpNode->onlyPushableWindowFunctions =
!queryTree->hasWindowFuncs ||
SafeToPushdownWindowFunction(originalQuery, NULL);
return extendedOpNode;
}

View File

@ -21,6 +21,7 @@
#include "distributed/multi_physical_planner.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h"
#include "optimizer/planner.h"
#include "rewrite/rewriteManip.h"
@ -206,10 +207,11 @@ BuildSelectStatementViaStdPlanner(Query *masterQuery, List *masterTargetList,
/* probably want to do this where we add sublinks to the master plan */
masterQuery->hasSubLinks = checkExprHasSubLink((Node *) masterQuery);
Assert(masterQuery->hasWindowFuncs == contain_window_function((Node *) masterQuery));
/*
* We will overwrite the alias of the rangetable which describes the custom scan.
* Idealy we would have set the correct column names and alias on the range table in
* Ideally we would have set the correct column names and alias on the range table in
* the master query already when we inserted the extra data container. This could be
* improved in the future.
*/

View File

@ -590,6 +590,8 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList)
bool hasDistinctOn = false;
List *distinctClause = NIL;
bool isRepartitionJoin = false;
bool hasWindowFuncs = false;
List *windowClause = NIL;
/* we start building jobs from below the collect node */
Assert(!CitusIsA(multiNode, MultiCollect));
@ -640,6 +642,8 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList)
targetList = copyObject(extendedOp->targetList);
distinctClause = extendedOp->distinctClause;
hasDistinctOn = extendedOp->hasDistinctOn;
hasWindowFuncs = extendedOp->hasWindowFuncs;
windowClause = extendedOp->windowClause;
}
else
{
@ -725,6 +729,8 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList)
contain_aggs_of_level((Node *) havingQual, 0);
jobQuery->distinctClause = distinctClause;
jobQuery->hasDistinctOn = hasDistinctOn;
jobQuery->windowClause = windowClause;
jobQuery->hasWindowFuncs = hasWindowFuncs;
return jobQuery;
}

View File

@ -107,8 +107,6 @@ bool
ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
StringInfo errorMessage = NULL;
/*
* We check the existence of subqueries in FROM clause on the modified query
* given that if postgres already flattened the subqueries, MultiNodeTree()
@ -190,7 +188,7 @@ ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
/* check if the query has a window function and it is safe to pushdown */
if (originalQuery->hasWindowFuncs &&
SafeToPushdownWindowFunction(originalQuery, &errorMessage))
SafeToPushdownWindowFunction(originalQuery, NULL))
{
return true;
}
@ -393,7 +391,7 @@ IsOuterJoinExpr(Node *node)
/*
* SafeToPushdownWindowFunction checks if the query with window function is supported.
* It returns the result accordingly and modifies the error detail.
* Returns the result accordingly and modifies errorDetail if non null.
*/
bool
SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail)
@ -411,20 +409,26 @@ SafeToPushdownWindowFunction(Query *query, StringInfo *errorDetail)
if (!windowClause->partitionClause)
{
*errorDetail = makeStringInfo();
appendStringInfoString(*errorDetail,
"Window functions without PARTITION BY on distribution "
"column is currently unsupported");
if (errorDetail)
{
*errorDetail = makeStringInfo();
appendStringInfoString(*errorDetail,
"Window functions without PARTITION BY on distribution "
"column is currently unsupported");
}
return false;
}
}
if (!WindowPartitionOnDistributionColumn(query))
{
*errorDetail = makeStringInfo();
appendStringInfoString(*errorDetail,
"Window functions with PARTITION BY list missing distribution "
"column is currently unsupported");
if (errorDetail)
{
*errorDetail = makeStringInfo();
appendStringInfoString(*errorDetail,
"Window functions with PARTITION BY list missing distribution "
"column is currently unsupported");
}
return false;
}
@ -1515,8 +1519,9 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType)
* down to workers without invoking join order planner.
*/
static MultiNode *
SubqueryPushdownMultiNodeTree(Query *queryTree)
SubqueryPushdownMultiNodeTree(Query *originalQuery)
{
Query *queryTree = copyObject(originalQuery);
List *targetEntryList = queryTree->targetList;
MultiCollect *subqueryCollectNode = CitusMakeNode(MultiCollect);
@ -1616,7 +1621,7 @@ SubqueryPushdownMultiNodeTree(Query *queryTree)
* distinguish between aggregates and expressions; and we address this later
* in the logical optimizer.
*/
MultiExtendedOp *extendedOpNode = MultiExtendedOpNode(queryTree);
MultiExtendedOp *extendedOpNode = MultiExtendedOpNode(queryTree, originalQuery);
/*
* Postgres standard planner converts having qual node to a list of and
@ -1763,11 +1768,9 @@ CreateSubqueryTargetEntryList(List *exprList)
{
Node *expr = (Node *) lfirst(exprCell);
TargetEntry *newTargetEntry = makeNode(TargetEntry);
StringInfo exprNameString = makeStringInfo();
newTargetEntry->expr = (Expr *) copyObject(expr);
appendStringInfo(exprNameString, WORKER_COLUMN_FORMAT, resNo);
newTargetEntry->resname = exprNameString->data;
newTargetEntry->resname = WorkerColumnName(resNo);
newTargetEntry->resjunk = false;
newTargetEntry->resno = resNo;

View File

@ -1651,8 +1651,8 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction)
partitionKeyTargetAttrIndex++;
if (!targetEntry->resjunk &&
IsPartitionColumn(targetExpression, relationPlannerParseQuery) &&
IsA(targetExpression, Var))
IsA(targetExpression, Var) &&
IsPartitionColumn(targetExpression, relationPlannerParseQuery))
{
Var *targetColumn = (Var *) targetExpression;

View File

@ -323,6 +323,9 @@ OutMultiExtendedOp(OUTFUNC_ARGS)
WRITE_NODE_FIELD(havingQual);
WRITE_BOOL_FIELD(hasDistinctOn);
WRITE_NODE_FIELD(distinctClause);
WRITE_BOOL_FIELD(hasWindowFuncs);
WRITE_BOOL_FIELD(onlyPushableWindowFunctions);
WRITE_NODE_FIELD(windowClause);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}

View File

@ -27,13 +27,15 @@ typedef struct ExtendedOpNodeProperties
bool repartitionSubquery;
bool hasNonPartitionColumnDistinctAgg;
bool pullDistinctColumns;
bool pushDownWindowFunctions;
bool hasWindowFuncs;
bool onlyPushableWindowFunctions;
bool pullUpIntermediateRows;
bool pushDownGroupingAndHaving;
} ExtendedOpNodeProperties;
extern ExtendedOpNodeProperties BuildExtendedOpNodeProperties(
MultiExtendedOp *extendedOpNode, bool pullUpIntermediateRows);
MultiExtendedOp *extendedOpNode, bool hasNonDistributableAggregates);
#endif /* EXTENDED_OP_NODE_UTILS_H_ */

View File

@ -162,7 +162,7 @@ extern bool ExtractQueryWalker(Node *node, List **queryList);
extern bool IsPartitionColumn(Expr *columnExpression, Query *query);
extern void FindReferencedTableColumn(Expr *columnExpression, List *parentQueryList,
Query *query, Oid *relationId, Var **column);
extern char * WorkerColumnName(AttrNumber resno);
extern bool IsGroupBySubsetOfDistinct(List *groupClauses, List *distinctClauses);
#endif /* MULTI_LOGICAL_OPTIMIZER_H */

View File

@ -176,9 +176,10 @@ typedef struct MultiExtendedOp
Node *limitOffset;
Node *havingQual;
List *distinctClause;
List *windowClause;
bool hasDistinctOn;
bool hasWindowFuncs;
List *windowClause;
bool onlyPushableWindowFunctions;
} MultiExtendedOp;
@ -219,7 +220,7 @@ extern List * pull_var_clause_default(Node *node);
extern bool OperatorImplementsEquality(Oid opno);
extern DeferredErrorMessage * DeferErrorIfUnsupportedClause(List *clauseList);
extern MultiProject * MultiProjectNode(List *targetEntryList);
extern MultiExtendedOp * MultiExtendedOpNode(Query *queryTree);
extern MultiExtendedOp * MultiExtendedOpNode(Query *queryTree, Query *originalQuery);
extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryRepartition(Query *
subqueryTree);
extern MultiNode * MultiNodeTree(Query *queryTree);

View File

@ -89,17 +89,118 @@ ORDER BY 1;
7 | 13
(2 rows)
-- These are going to be supported after window function support
SELECT day, hll_cardinality(hll_union_agg(unique_users) OVER seven_days)
FROM daily_uniques
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING);
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
ORDER BY 1;
day | hll_cardinality
---------------------------------------------------------------------
05-24-2018 | 19
05-25-2018 | 19
05-26-2018 | 19
05-27-2018 | 19
05-28-2018 | 19
05-29-2018 | 19
05-30-2018 | 19
05-31-2018 | 19
06-01-2018 | 19
06-02-2018 | 19
06-03-2018 | 19
06-04-2018 | 19
06-05-2018 | 19
06-06-2018 | 19
06-07-2018 | 19
06-08-2018 | 19
06-09-2018 | 19
06-10-2018 | 19
06-11-2018 | 19
06-12-2018 | 19
06-13-2018 | 19
06-14-2018 | 19
06-15-2018 | 19
06-16-2018 | 19
06-17-2018 | 19
06-18-2018 | 19
06-19-2018 | 19
06-20-2018 | 19
06-21-2018 | 19
06-22-2018 | 19
06-23-2018 | 19
06-24-2018 | 19
06-25-2018 | 19
06-26-2018 | 19
06-27-2018 | 19
06-28-2018 | 19
06-29-2018 | 19
06-30-2018 | 19
07-01-2018 | 13
07-02-2018 | 13
07-03-2018 | 13
07-04-2018 | 13
07-05-2018 | 13
07-06-2018 | 13
07-07-2018 | 13
07-08-2018 | 13
07-09-2018 | 13
07-10-2018 | 13
(48 rows)
SELECT day, (hll_cardinality(hll_union_agg(unique_users) OVER two_days)) - hll_cardinality(unique_users) AS lost_uniques
FROM daily_uniques
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING);
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING)
ORDER BY 1;
day | lost_uniques
---------------------------------------------------------------------
05-24-2018 | 0
05-25-2018 | 0
05-26-2018 | 0
05-27-2018 | 0
05-28-2018 | 0
05-29-2018 | 0
05-30-2018 | 0
05-31-2018 | 0
06-01-2018 | 0
06-02-2018 | 0
06-03-2018 | 0
06-04-2018 | 0
06-05-2018 | 0
06-06-2018 | 0
06-07-2018 | 0
06-08-2018 | 0
06-09-2018 | 0
06-10-2018 | 0
06-11-2018 | 0
06-12-2018 | 0
06-13-2018 | 0
06-14-2018 | 0
06-15-2018 | 0
06-16-2018 | 0
06-17-2018 | 0
06-18-2018 | 0
06-19-2018 | 0
06-20-2018 | 0
06-21-2018 | 0
06-22-2018 | 0
06-23-2018 | 0
06-24-2018 | 0
06-25-2018 | 6
06-26-2018 | 0
06-27-2018 | 0
06-28-2018 | 0
06-29-2018 | 0
06-30-2018 | 0
07-01-2018 | 0
07-02-2018 | 0
07-03-2018 | 0
07-04-2018 | 0
07-05-2018 | 0
07-06-2018 | 0
07-07-2018 | 0
07-08-2018 | 0
07-09-2018 | 0
07-10-2018 | 0
(48 rows)
-- Test disabling hash_agg on coordinator query
SET citus.explain_all_tasks to true;
SET hll.force_groupagg to OFF;
@ -514,7 +615,6 @@ SELECT (topn(topn_add_agg(user_id::text), 10)).*
FROM customer_reviews
ORDER BY 2 DESC, 1;
ERROR: set-valued function called in context that cannot accept a set
-- The following is going to be supported after window function support
SELECT day, (topn(agg, 10)).*
FROM (
SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg
@ -523,14 +623,25 @@ FROM (
)a
ORDER BY 3 DESC, 1, 2
LIMIT 10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
day | item | frequency
---------------------------------------------------------------------
06-16-2018 | 1 | 1736
06-16-2018 | 2 | 1736
06-17-2018 | 1 | 1736
06-17-2018 | 2 | 1736
06-18-2018 | 1 | 1736
06-18-2018 | 2 | 1736
06-19-2018 | 1 | 1736
06-19-2018 | 2 | 1736
06-20-2018 | 1 | 1736
06-20-2018 | 2 | 1736
(10 rows)
SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).*
FROM customer_reviews
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
ORDER BY 3 DESC, 1, 2
LIMIT 10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
ERROR: set-valued function called in context that cannot accept a set
DROP TABLE customer_reviews;
DROP TABLE popular_reviewer;

View File

@ -66,14 +66,15 @@ WHERE day >= '2018-06-23' AND day <= '2018-07-01'
GROUP BY 1
ORDER BY 1;
ERROR: relation "daily_uniques" does not exist
-- These are going to be supported after window function support
SELECT day, hll_cardinality(hll_union_agg(unique_users) OVER seven_days)
FROM daily_uniques
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING);
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
ORDER BY 1;
ERROR: relation "daily_uniques" does not exist
SELECT day, (hll_cardinality(hll_union_agg(unique_users) OVER two_days)) - hll_cardinality(unique_users) AS lost_uniques
FROM daily_uniques
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING);
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING)
ORDER BY 1;
ERROR: relation "daily_uniques" does not exist
-- Test disabling hash_agg on coordinator query
SET citus.explain_all_tasks to true;
@ -242,7 +243,6 @@ FROM customer_reviews
ORDER BY 2 DESC, 1;
ERROR: function topn_add_agg(text) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- The following is going to be supported after window function support
SELECT day, (topn(agg, 10)).*
FROM (
SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg

View File

@ -9,7 +9,6 @@ AS create_cmd FROM pg_available_extensions()
WHERE name = 'hll'
\gset
:create_cmd;
ERROR: extension "hll" already exists
SET citus.shard_count TO 4;
set citus.coordinator_aggregation_strategy to 'disabled';
CREATE TABLE raw_table (day date, user_id int);
@ -89,17 +88,118 @@ ORDER BY 1;
7 | 13
(2 rows)
-- These are going to be supported after window function support
SELECT day, hll_cardinality(hll_union_agg(unique_users) OVER seven_days)
FROM daily_uniques
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING);
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
ORDER BY 1;
day | hll_cardinality
---------------------------------------------------------------------
05-24-2018 | 19
05-25-2018 | 19
05-26-2018 | 19
05-27-2018 | 19
05-28-2018 | 19
05-29-2018 | 19
05-30-2018 | 19
05-31-2018 | 19
06-01-2018 | 19
06-02-2018 | 19
06-03-2018 | 19
06-04-2018 | 19
06-05-2018 | 19
06-06-2018 | 19
06-07-2018 | 19
06-08-2018 | 19
06-09-2018 | 19
06-10-2018 | 19
06-11-2018 | 19
06-12-2018 | 19
06-13-2018 | 19
06-14-2018 | 19
06-15-2018 | 19
06-16-2018 | 19
06-17-2018 | 19
06-18-2018 | 19
06-19-2018 | 19
06-20-2018 | 19
06-21-2018 | 19
06-22-2018 | 19
06-23-2018 | 19
06-24-2018 | 19
06-25-2018 | 19
06-26-2018 | 19
06-27-2018 | 19
06-28-2018 | 19
06-29-2018 | 19
06-30-2018 | 19
07-01-2018 | 13
07-02-2018 | 13
07-03-2018 | 13
07-04-2018 | 13
07-05-2018 | 13
07-06-2018 | 13
07-07-2018 | 13
07-08-2018 | 13
07-09-2018 | 13
07-10-2018 | 13
(48 rows)
SELECT day, (hll_cardinality(hll_union_agg(unique_users) OVER two_days)) - hll_cardinality(unique_users) AS lost_uniques
FROM daily_uniques
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING);
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING)
ORDER BY 1;
day | lost_uniques
---------------------------------------------------------------------
05-24-2018 | 0
05-25-2018 | 0
05-26-2018 | 0
05-27-2018 | 0
05-28-2018 | 0
05-29-2018 | 0
05-30-2018 | 0
05-31-2018 | 0
06-01-2018 | 0
06-02-2018 | 0
06-03-2018 | 0
06-04-2018 | 0
06-05-2018 | 0
06-06-2018 | 0
06-07-2018 | 0
06-08-2018 | 0
06-09-2018 | 0
06-10-2018 | 0
06-11-2018 | 0
06-12-2018 | 0
06-13-2018 | 0
06-14-2018 | 0
06-15-2018 | 0
06-16-2018 | 0
06-17-2018 | 0
06-18-2018 | 0
06-19-2018 | 0
06-20-2018 | 0
06-21-2018 | 0
06-22-2018 | 0
06-23-2018 | 0
06-24-2018 | 0
06-25-2018 | 6
06-26-2018 | 0
06-27-2018 | 0
06-28-2018 | 0
06-29-2018 | 0
06-30-2018 | 0
07-01-2018 | 0
07-02-2018 | 0
07-03-2018 | 0
07-04-2018 | 0
07-05-2018 | 0
07-06-2018 | 0
07-07-2018 | 0
07-08-2018 | 0
07-09-2018 | 0
07-10-2018 | 0
(48 rows)
-- Test disabling hash_agg on coordinator query
SET citus.explain_all_tasks to true;
SET hll.force_groupagg to OFF;
@ -109,34 +209,32 @@ SELECT
FROM
daily_uniques
GROUP BY(1);
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.day
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(25 rows)
Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(23 rows)
SET hll.force_groupagg to ON;
EXPLAIN(COSTS OFF)
@ -145,36 +243,32 @@ SELECT
FROM
daily_uniques
GROUP BY(1);
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
GroupAggregate
Group Key: remote_scan.day
-> Sort
Sort Key: remote_scan.day
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(27 rows)
Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(23 rows)
-- Test disabling hash_agg with operator on coordinator query
SET hll.force_groupagg to OFF;
@ -184,34 +278,32 @@ SELECT
FROM
daily_uniques
GROUP BY(1);
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.day
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(25 rows)
Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(23 rows)
SET hll.force_groupagg to ON;
EXPLAIN(COSTS OFF)
@ -220,36 +312,32 @@ SELECT
FROM
daily_uniques
GROUP BY(1);
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
GroupAggregate
Group Key: remote_scan.day
-> Sort
Sort Key: remote_scan.day
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(27 rows)
Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(23 rows)
-- Test disabling hash_agg with expression on coordinator query
SET hll.force_groupagg to OFF;
@ -259,34 +347,32 @@ SELECT
FROM
daily_uniques
GROUP BY(1);
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.day
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(25 rows)
Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(23 rows)
SET hll.force_groupagg to ON;
EXPLAIN(COSTS OFF)
@ -295,36 +381,32 @@ SELECT
FROM
daily_uniques
GROUP BY(1);
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
GroupAggregate
Group Key: remote_scan.day
-> Sort
Sort Key: remote_scan.day
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(27 rows)
Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(23 rows)
-- Test disabling hash_agg with having
SET hll.force_groupagg to OFF;
@ -334,34 +416,32 @@ SELECT
FROM
daily_uniques
GROUP BY(1);
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
HashAggregate
Group Key: remote_scan.day
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(25 rows)
Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(23 rows)
SET hll.force_groupagg to ON;
EXPLAIN(COSTS OFF)
@ -371,49 +451,36 @@ FROM
daily_uniques
GROUP BY(1)
HAVING hll_cardinality(hll_union_agg(unique_users)) > 1;
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
GroupAggregate
Group Key: remote_scan.day
Filter: (hll_cardinality(hll_union_agg(remote_scan.worker_column_3)) > '1'::double precision)
-> Sort
Sort Key: remote_scan.day
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> GroupAggregate
Group Key: day
Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision)
-> Sort
Sort Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> GroupAggregate
Group Key: day
Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision)
-> Sort
Sort Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> GroupAggregate
Group Key: day
Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision)
-> Sort
Sort Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> GroupAggregate
Group Key: day
Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision)
-> Sort
Sort Key: day
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(40 rows)
Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: All
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision)
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision)
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision)
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: day
Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision)
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(27 rows)
DROP TABLE raw_table;
DROP TABLE daily_uniques;
@ -542,35 +609,11 @@ SELECT (topn(topn_union_agg(reviewers), 10)).*
FROM popular_reviewer
WHERE day >= '2018-05-24'::date AND day <= '2018-05-31'::date
ORDER BY 2 DESC, 1;
item | frequency
---------------------------------------------------------------------
1 | 1240
2 | 1240
0 | 992
3 | 992
4 | 992
5 | 992
6 | 992
(7 rows)
ERROR: set-valued function called in context that cannot accept a set
SELECT (topn(topn_add_agg(user_id::text), 10)).*
FROM customer_reviews
ORDER BY 2 DESC, 1;
item | frequency
---------------------------------------------------------------------
1 | 7843
2 | 7843
3 | 6851
4 | 6851
0 | 5890
5 | 5890
6 | 5890
7 | 1922
8 | 1922
9 | 1922
(10 rows)
-- The following is going to be supported after window function support
ERROR: set-valued function called in context that cannot accept a set
SELECT day, (topn(agg, 10)).*
FROM (
SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg
@ -579,14 +622,25 @@ FROM (
)a
ORDER BY 3 DESC, 1, 2
LIMIT 10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
day | item | frequency
---------------------------------------------------------------------
06-16-2018 | 1 | 1736
06-16-2018 | 2 | 1736
06-17-2018 | 1 | 1736
06-17-2018 | 2 | 1736
06-18-2018 | 1 | 1736
06-18-2018 | 2 | 1736
06-19-2018 | 1 | 1736
06-19-2018 | 2 | 1736
06-20-2018 | 1 | 1736
06-20-2018 | 2 | 1736
(10 rows)
SELECT day, (topn(topn_add_agg(user_id::text) OVER seven_days, 10)).*
FROM customer_reviews
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
ORDER BY 3 DESC, 1, 2
LIMIT 10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
ERROR: set-valued function called in context that cannot accept a set
DROP TABLE customer_reviews;
DROP TABLE popular_reviewer;

View File

@ -133,16 +133,11 @@ FROM
DEBUG: CTE joined_stats_cte_1 is going to be inlined via distributed planning
DEBUG: CTE joined_stats_cte_2 is going to be inlined via distributed planning
DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM local_shard_execution.accounts
DEBUG: generating subplan XXX_1 for CTE accounts_cte: SELECT id AS account_id FROM local_shard_execution.accounts
DEBUG: generating subplan XXX_2 for CTE joined_stats_cte_1: SELECT stats.spent, stats.account_id FROM (local_shard_execution.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id))
DEBUG: generating subplan XXX_3 for CTE joined_stats_cte_2: SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte USING (account_id))
DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT intermediate_result.spent, intermediate_result.account_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(spent integer, account_id text)) joined_stats_cte_2 USING (account_id))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: generating subplan XXX_2 for subquery SELECT sum(joined_stats_cte_2.spent) OVER (PARTITION BY COALESCE(accounts_cte.account_id, NULL::text)) AS sum FROM ((SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte JOIN (SELECT joined_stats_cte_1.spent, joined_stats_cte_1.account_id FROM ((SELECT stats.spent, stats.account_id FROM (local_shard_execution.stats JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte_2 USING (account_id))) joined_stats_cte_1 JOIN (SELECT intermediate_result.account_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(account_id text)) accounts_cte_1 USING (account_id))) joined_stats_cte_2 USING (account_id))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
DEBUG: Subplan XXX_3 will be written to local file
DEBUG: Subplan XXX_4 will be written to local file
sum
---------------------------------------------------------------------
100

View File

@ -410,6 +410,56 @@ LIMIT 10 OFFSET 20;
1453 | 5
(10 rows)
SELECT
customer_keys.o_custkey,
SUM(order_count) AS total_order_count
FROM
(SELECT o_custkey, o_orderstatus, COUNT(*) over (partition by o_orderstatus) AS order_count
FROM orders GROUP BY o_custkey, o_orderstatus ) customer_keys
GROUP BY
customer_keys.o_custkey
ORDER BY
customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20;
o_custkey | total_order_count
---------------------------------------------------------------------
1466 | 759
1465 | 759
1463 | 1499
1462 | 1499
1460 | 759
1459 | 1499
1457 | 740
1456 | 830
1454 | 1499
1453 | 1499
(10 rows)
SELECT
customer_keys.o_custkey,
SUM(order_count1 + order_count) AS total_order_count
FROM
(SELECT o_custkey, o_orderstatus, count(*) order_count1, COUNT(*) over (partition by o_orderstatus) AS order_count
FROM orders GROUP BY o_custkey, o_orderstatus ) customer_keys
GROUP BY
customer_keys.o_custkey
ORDER BY
customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20;
o_custkey | total_order_count
---------------------------------------------------------------------
1466 | 760
1465 | 761
1463 | 1503
1462 | 1509
1460 | 760
1459 | 1505
1457 | 741
1456 | 833
1454 | 1501
1453 | 1504
(10 rows)
RESET citus.task_executor_type;
SET client_min_messages TO DEBUG1;
-- Ensure that we push down LIMIT and OFFSET properly

View File

@ -396,6 +396,56 @@ LIMIT 10 OFFSET 20;
1453 | 5
(10 rows)
SELECT
customer_keys.o_custkey,
SUM(order_count) AS total_order_count
FROM
(SELECT o_custkey, o_orderstatus, COUNT(*) over (partition by o_orderstatus) AS order_count
FROM orders GROUP BY o_custkey, o_orderstatus ) customer_keys
GROUP BY
customer_keys.o_custkey
ORDER BY
customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20;
o_custkey | total_order_count
---------------------------------------------------------------------
1466 | 759
1465 | 759
1463 | 1499
1462 | 1499
1460 | 759
1459 | 1499
1457 | 740
1456 | 830
1454 | 1499
1453 | 1499
(10 rows)
SELECT
customer_keys.o_custkey,
SUM(order_count1 + order_count) AS total_order_count
FROM
(SELECT o_custkey, o_orderstatus, count(*) order_count1, COUNT(*) over (partition by o_orderstatus) AS order_count
FROM orders GROUP BY o_custkey, o_orderstatus ) customer_keys
GROUP BY
customer_keys.o_custkey
ORDER BY
customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20;
o_custkey | total_order_count
---------------------------------------------------------------------
1466 | 760
1465 | 761
1463 | 1503
1462 | 1509
1460 | 760
1459 | 1505
1457 | 741
1456 | 833
1454 | 1501
1453 | 1504
(10 rows)
RESET citus.task_executor_type;
SET client_min_messages TO DEBUG1;
-- Ensure that we push down LIMIT and OFFSET properly

View File

@ -453,11 +453,8 @@ HashAggregate
Tasks Shown: One of 2
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
-> Seq Scan on public.lineitem_290000 lineitem
Output: l_quantity, l_quantity
Group Key: lineitem.l_quantity
-> Seq Scan on public.lineitem_290000 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
-- Subquery pushdown tests with explain
EXPLAIN (COSTS OFF)
SELECT

View File

@ -637,7 +637,7 @@ SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_window;
(1 row)
TRUNCATE agg_results_window;
-- lets have some queries that Citus shouldn't push down
-- test queries where the window function isn't pushed down
INSERT INTO agg_results_window (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
@ -653,10 +653,6 @@ ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- user needs to supply partition by which should
-- include the distribution key
INSERT INTO agg_results_window (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
@ -672,10 +668,6 @@ ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- user needs to supply partition by which should
-- include the distribution key
INSERT INTO agg_results_window (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
@ -691,9 +683,6 @@ ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- w2 should not be pushed down
INSERT INTO agg_results_window (user_id, value_1_agg, value_2_agg)
SELECT * FROM
(
@ -709,9 +698,6 @@ SELECT * FROM
) as foo
LIMIT
10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- GROUP BY includes the partition key, but not the WINDOW function
INSERT INTO agg_results_window (user_id, agg_time, value_2_agg)
SELECT
user_id, time, my_rank
@ -727,9 +713,6 @@ FROM
) as foo
WHERE
my_rank > 125;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- GROUP BY includes the partition key, but not the WINDOW function
INSERT INTO agg_results_window (user_id, agg_time, value_2_agg)
SELECT
user_id, time, my_rank
@ -745,9 +728,6 @@ FROM
) as foo
WHERE
my_rank > 125;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- w2 should not be allowed
INSERT INTO agg_results_window (user_id, value_2_agg, value_3_agg)
SELECT * FROM
(
@ -761,9 +741,6 @@ SELECT * FROM
WINDOW w1 AS (PARTITION BY users_table.user_id, events_table.event_type ORDER BY events_table.time),
w2 AS (ORDER BY events_table.time)
) as foo;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- unsupported window function with an override
INSERT INTO agg_results_window(user_id, agg_time, value_2_agg)
SELECT * FROM (
SELECT
@ -779,9 +756,6 @@ SELECT * FROM (
WINDOW
w2 as (PARTITION BY user_id, time)
) a;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- Subquery in where with unsupported window function
INSERT INTO agg_results_window(user_id)
SELECT
user_id
@ -801,7 +775,6 @@ GROUP BY
user_id;
ERROR: cannot push down this subquery
DETAIL: Window functions without PARTITION BY on distribution column is currently unsupported
-- Aggregate function on distribution column should error out
INSERT INTO agg_results_window(user_id, value_2_agg)
SELECT * FROM (
SELECT
@ -811,10 +784,6 @@ SELECT * FROM (
GROUP BY
user_id
) a;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- UNION with only one subquery which has a partition on non-distribution column should
-- error out
INSERT INTO agg_results_window(user_id, value_1_agg)
SELECT *
FROM (
@ -849,6 +818,4 @@ FROM (
user_id
)
) AS ftop;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
DROP VIEW view_with_window_func;

View File

@ -1194,19 +1194,44 @@ DETAIL: distribution column value: 1
11814 | 5
(5 rows)
-- window functions are not supported for not router plannable queries
SELECT id, MIN(id) over (order by word_count)
FROM articles_hash_mx
WHERE author_id = 1 or author_id = 2;
WHERE author_id = 1 or author_id = 2
ORDER BY 1;
DEBUG: Router planner cannot handle multi-shard select queries
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
id | min
---------------------------------------------------------------------
1 | 1
2 | 1
11 | 11
12 | 1
21 | 11
22 | 11
31 | 11
32 | 1
41 | 1
42 | 1
(10 rows)
SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
FROM articles_hash_mx
WHERE author_id = 5 or author_id = 2;
WHERE author_id = 5 or author_id = 2
ORDER BY 2;
DEBUG: Router planner cannot handle multi-shard select queries
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
prev | title | word_count
---------------------------------------------------------------------
aruru | abducing | 13642
antipope | adversa | 3164
| afrasia | 864
aminate | amazon | 11342
antehall | aminate | 9089
adversa | antehall | 7707
afrasia | antipope | 2728
ausable | archiblast | 18185
amazon | aruru | 11389
abducing | ausable | 15885
(10 rows)
-- complex query hitting a single shard
SELECT
count(DISTINCT CASE

View File

@ -103,17 +103,11 @@ FROM
DEBUG: CTE joined_source_table_cte_1 is going to be inlined via distributed planning
DEBUG: CTE joined_source_table_cte_2 is going to be inlined via distributed planning
DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table
DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table
DEBUG: generating subplan XXX_2 for CTE joined_source_table_cte_1: SELECT source_table.b, source_table.a FROM (public.source_table JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a))
DEBUG: generating subplan XXX_3 for CTE joined_source_table_cte_2: SELECT joined_source_table_cte_1.b, joined_source_table_cte_1.a FROM ((SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_1 JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a))
DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_source_table_cte_2.b) OVER (PARTITION BY COALESCE(dest_table_cte.a, NULL::integer)) AS sum FROM ((SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte JOIN (SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_2 USING (a))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: generating subplan XXX_2 for subquery SELECT sum(joined_source_table_cte_2.b) OVER (PARTITION BY COALESCE(dest_table_cte.a, NULL::integer)) AS sum FROM ((SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte JOIN (SELECT joined_source_table_cte_1.b, joined_source_table_cte_1.a FROM ((SELECT source_table.b, source_table.a FROM (public.source_table JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte_2 USING (a))) joined_source_table_cte_1 JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte_1 USING (a))) joined_source_table_cte_2 USING (a))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
DEBUG: Subplan XXX_3 will be written to local file
DEBUG: Subplan XXX_4 will be written to local file
sum
---------------------------------------------------------------------
5
@ -145,16 +139,11 @@ FROM
DEBUG: CTE joined_source_table_cte_1 is going to be inlined via distributed planning
DEBUG: CTE joined_source_table_cte_2 is going to be inlined via distributed planning
DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table
DEBUG: generating subplan XXX_1 for CTE dest_table_cte: SELECT a FROM public.dest_table
DEBUG: generating subplan XXX_2 for CTE joined_source_table_cte_1: SELECT source_table.b, source_table.a FROM (public.source_table JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a))
DEBUG: generating subplan XXX_3 for CTE joined_source_table_cte_2: SELECT joined_source_table_cte_1.b, joined_source_table_cte_1.a FROM ((SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_1 JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte USING (a))
DEBUG: generating subplan XXX_4 for subquery SELECT sum(joined_source_table_cte_2.b) OVER (PARTITION BY COALESCE(dest_table_cte.a, NULL::integer)) AS sum FROM ((SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte JOIN (SELECT intermediate_result.b, intermediate_result.a FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(b integer, a integer)) joined_source_table_cte_2 USING (a))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
DEBUG: generating subplan XXX_2 for subquery SELECT sum(joined_source_table_cte_2.b) OVER (PARTITION BY COALESCE(dest_table_cte.a, NULL::integer)) AS sum FROM ((SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte JOIN (SELECT joined_source_table_cte_1.b, joined_source_table_cte_1.a FROM ((SELECT source_table.b, source_table.a FROM (public.source_table JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte_2 USING (a))) joined_source_table_cte_1 JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) dest_table_cte_1 USING (a))) joined_source_table_cte_2 USING (a))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT sum FROM (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)) inner_query
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx
sum
---------------------------------------------------------------------
5

View File

@ -1311,7 +1311,6 @@ DETAIL: distribution column value: 1
41 | 1 | aznavour | 11814
(3 rows)
-- window functions are supported if query is router plannable
SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
FROM articles_hash
WHERE author_id = 5;
@ -1388,19 +1387,44 @@ DETAIL: distribution column value: 1
11814 | 5
(5 rows)
-- window functions are not supported for not router plannable queries
SELECT id, MIN(id) over (order by word_count)
FROM articles_hash
WHERE author_id = 1 or author_id = 2;
WHERE author_id = 1 or author_id = 2
ORDER BY 1;
DEBUG: Router planner cannot handle multi-shard select queries
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
id | min
---------------------------------------------------------------------
1 | 1
2 | 1
11 | 11
12 | 1
21 | 11
22 | 11
31 | 11
32 | 1
41 | 1
42 | 1
(10 rows)
SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
FROM articles_hash
WHERE author_id = 5 or author_id = 2;
WHERE author_id = 5 or author_id = 2
ORDER BY 2;
DEBUG: Router planner cannot handle multi-shard select queries
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
prev | title | word_count
---------------------------------------------------------------------
aruru | abducing | 13642
antipope | adversa | 3164
| afrasia | 864
aminate | amazon | 11342
antehall | aminate | 9089
adversa | antehall | 7707
afrasia | antipope | 2728
ausable | archiblast | 18185
amazon | aruru | 11389
abducing | ausable | 15885
(10 rows)
-- where false queries are router plannable
SELECT *
FROM articles_hash

View File

@ -1116,18 +1116,18 @@ EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (l_orderkey) l_orderkey, l_partkey, l_suppkey
FROM lineitem_hash_part
WHERE l_orderkey < 35
ORDER BY 1;
ORDER BY 1, 2, 3;
l_orderkey | l_partkey | l_suppkey
---------------------------------------------------------------------
1 | 155190 | 7706
1 | 2132 | 4633
2 | 106170 | 1191
3 | 4297 | 1798
4 | 88035 | 5560
5 | 108570 | 8571
5 | 37531 | 35
6 | 139636 | 2150
7 | 182052 | 9607
32 | 82704 | 7721
33 | 61336 | 8855
7 | 79251 | 1759
32 | 2743 | 7744
33 | 33918 | 3919
34 | 88362 | 871
(10 rows)
@ -1135,12 +1135,12 @@ EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (l_orderkey) l_orderkey, l_partkey, l_suppkey
FROM lineitem_hash_part
WHERE l_orderkey < 35
ORDER BY 1;
ORDER BY 1, 2, 3;
QUERY PLAN
---------------------------------------------------------------------
Unique
-> Sort
Sort Key: remote_scan.l_orderkey
Sort Key: remote_scan.l_orderkey, remote_scan.l_partkey, remote_scan.l_suppkey
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
@ -1148,7 +1148,7 @@ EXPLAIN (COSTS FALSE)
Node: host=localhost port=xxxxx dbname=regression
-> Unique
-> Sort
Sort Key: l_orderkey
Sort Key: l_orderkey, l_partkey, l_suppkey
-> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part
Filter: (l_orderkey < 35)
(13 rows)

View File

@ -775,7 +775,7 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE)
Output: events_table_1.user_id, events_table_1.value_2
(62 rows)
-- lets have some queries that Citus shouldn't push down
-- test with window functions which aren't pushed down
SELECT
user_id, time, rnk
FROM
@ -790,10 +790,20 @@ ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- user needs to supply partition by which should
-- include the distribution key
user_id | time | rnk
---------------------------------------------------------------------
1 | Wed Nov 22 19:07:03.846437 2017 | 24
5 | Wed Nov 22 20:45:35.99031 2017 | 23
1 | Wed Nov 22 18:49:42.327403 2017 | 23
3 | Wed Nov 22 21:12:24.542921 2017 | 22
3 | Wed Nov 22 20:23:46.906523 2017 | 22
6 | Wed Nov 22 20:36:09.106561 2017 | 21
3 | Wed Nov 22 21:26:21.185134 2017 | 21
1 | Wed Nov 22 19:03:01.772353 2017 | 21
6 | Wed Nov 22 22:44:48.458334 2017 | 20
3 | Wed Nov 22 22:05:38.409323 2017 | 20
(10 rows)
SELECT
user_id, time, rnk
FROM
@ -808,10 +818,20 @@ ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- user needs to supply partition by which should
-- include the distribution key
user_id | time | rnk
---------------------------------------------------------------------
6 | Thu Nov 23 14:00:13.20013 2017 | 1
6 | Thu Nov 23 11:16:13.106691 2017 | 1
6 | Thu Nov 23 07:27:32.822068 2017 | 1
6 | Thu Nov 23 02:06:53.132461 2017 | 1
6 | Thu Nov 23 00:45:41.784391 2017 | 1
6 | Thu Nov 23 00:01:48.155345 2017 | 1
6 | Wed Nov 22 23:15:15.875499 2017 | 1
6 | Wed Nov 22 22:44:48.458334 2017 | 1
6 | Wed Nov 22 21:17:09.549341 2017 | 1
6 | Wed Nov 22 20:36:09.106561 2017 | 1
(10 rows)
SELECT
user_id, time, rnk
FROM
@ -826,9 +846,20 @@ ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- w2 should not be pushed down
user_id | time | rnk
---------------------------------------------------------------------
3 | Wed Nov 22 18:36:16.372893 2017 | 101
1 | Wed Nov 22 18:49:42.327403 2017 | 100
4 | Wed Nov 22 19:00:10.396739 2017 | 99
1 | Wed Nov 22 19:03:01.772353 2017 | 98
1 | Wed Nov 22 19:07:03.846437 2017 | 97
2 | Wed Nov 22 20:16:16.614779 2017 | 96
3 | Wed Nov 22 20:23:46.906523 2017 | 95
6 | Wed Nov 22 20:36:09.106561 2017 | 94
5 | Wed Nov 22 20:45:35.99031 2017 | 93
1 | Wed Nov 22 20:56:21.122638 2017 | 92
(10 rows)
SELECT * FROM
(
SELECT
@ -843,9 +874,20 @@ SELECT * FROM
) as foo
ORDER BY 3 DESC, 1 DESC, 2 DESC NULLS LAST
LIMIT 10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- w2 should not be pushed down
user_id | lag | rank
---------------------------------------------------------------------
2 | 2 | 73
4 | 4 | 70
3 | 3 | 69
2 | 2 | 55
5 | 5 | 53
5 | | 53
3 | 3 | 52
4 | 4 | 47
2 | 2 | 37
3 | 3 | 35
(10 rows)
SELECT * FROM
(
SELECT
@ -862,9 +904,20 @@ ORDER BY
3 DESC, 1 DESC, 2 DESC NULLS LAST
LIMIT
10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- GROUP BY includes the partition key, but not the WINDOW function
user_id | lag | rank
---------------------------------------------------------------------
4 | 4 | 1262
3 | 3 | 1245
2 | 2 | 1227
4 | 4 | 1204
4 | | 1204
5 | 5 | 1178
5 | 5 | 1152
5 | 5 | 1126
4 | 4 | 1103
2 | 2 | 1085
(10 rows)
SELECT
user_id, time, my_rank
FROM
@ -883,9 +936,20 @@ ORDER BY
3 DESC, 1 DESC,2 DESC
LIMIT
10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- GROUP BY includes the partition key, but not the WINDOW function
user_id | time | my_rank
---------------------------------------------------------------------
4 | Wed Nov 22 00:00:00 2017 | 12
6 | Wed Nov 22 00:00:00 2017 | 11
5 | Wed Nov 22 00:00:00 2017 | 10
6 | Thu Nov 23 00:00:00 2017 | 9
3 | Thu Nov 23 00:00:00 2017 | 8
1 | Thu Nov 23 00:00:00 2017 | 7
1 | Wed Nov 22 00:00:00 2017 | 6
2 | Thu Nov 23 00:00:00 2017 | 5
4 | Thu Nov 23 00:00:00 2017 | 4
2 | Wed Nov 22 00:00:00 2017 | 3
(10 rows)
SELECT
user_id, time, my_rank
FROM
@ -904,9 +968,20 @@ ORDER BY
3 DESC, 1 DESC,2 DESC
LIMIT
10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- Overriding window function but not supported
user_id | time | my_rank
---------------------------------------------------------------------
6 | Thu Nov 23 00:00:00 2017 | 6
4 | Wed Nov 22 00:00:00 2017 | 6
6 | Wed Nov 22 00:00:00 2017 | 5
3 | Thu Nov 23 00:00:00 2017 | 5
5 | Wed Nov 22 00:00:00 2017 | 4
1 | Thu Nov 23 00:00:00 2017 | 4
2 | Thu Nov 23 00:00:00 2017 | 3
1 | Wed Nov 22 00:00:00 2017 | 3
4 | Thu Nov 23 00:00:00 2017 | 2
2 | Wed Nov 22 00:00:00 2017 | 2
(10 rows)
SELECT * FROM (
SELECT
user_id, date_trunc('day', time) as time, sum(rank) OVER w2
@ -923,9 +998,111 @@ SELECT * FROM (
) a
ORDER BY
1,2,3;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- Aggregate function on distribution column should error out
user_id | time | sum
---------------------------------------------------------------------
1 | Wed Nov 22 00:00:00 2017 | 1
1 | Thu Nov 23 00:00:00 2017 | 1
1 | Thu Nov 23 00:00:00 2017 | 1
1 | Thu Nov 23 00:00:00 2017 | 1
1 | Thu Nov 23 00:00:00 2017 | 1
1 | Thu Nov 23 00:00:00 2017 | 1
1 | Thu Nov 23 00:00:00 2017 | 1
2 | Wed Nov 22 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
2 | Thu Nov 23 00:00:00 2017 | 1
3 | Wed Nov 22 00:00:00 2017 | 1
3 | Wed Nov 22 00:00:00 2017 | 1
3 | Wed Nov 22 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
3 | Thu Nov 23 00:00:00 2017 | 1
4 | Wed Nov 22 00:00:00 2017 | 1
4 | Wed Nov 22 00:00:00 2017 | 1
4 | Wed Nov 22 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
4 | Thu Nov 23 00:00:00 2017 | 1
5 | Wed Nov 22 00:00:00 2017 | 1
5 | Wed Nov 22 00:00:00 2017 | 1
5 | Wed Nov 22 00:00:00 2017 | 1
5 | Wed Nov 22 00:00:00 2017 | 1
5 | Wed Nov 22 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
5 | Thu Nov 23 00:00:00 2017 | 1
6 | Wed Nov 22 00:00:00 2017 | 1
6 | Wed Nov 22 00:00:00 2017 | 1
6 | Thu Nov 23 00:00:00 2017 | 1
6 | Thu Nov 23 00:00:00 2017 | 1
6 | Thu Nov 23 00:00:00 2017 | 1
6 | Thu Nov 23 00:00:00 2017 | 1
6 | Thu Nov 23 00:00:00 2017 | 1
6 | Thu Nov 23 00:00:00 2017 | 1
6 | Thu Nov 23 00:00:00 2017 | 1
6 | Thu Nov 23 00:00:00 2017 | 1
(101 rows)
SELECT * FROM (
SELECT
user_id, COUNT(*) OVER (PARTITION BY sum(user_id), MIN(value_2))
@ -936,8 +1113,16 @@ SELECT * FROM (
) a
ORDER BY
1 DESC, 2 DESC;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
user_id | count
---------------------------------------------------------------------
6 | 1
5 | 1
4 | 1
3 | 1
2 | 1
1 | 1
(6 rows)
-- test with reference table partitioned on only a column from reference table
SELECT *
FROM
@ -951,10 +1136,30 @@ ORDER BY
1, 2, 3
LIMIT
20;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- UNION ALL with only one of them is not partitioned over distribution column which
-- should not be allowed.
user_id | it_name | count
---------------------------------------------------------------------
1 | User_1 | 101
1 | User_2 | 101
1 | User_3 | 101
1 | User_4 | 101
1 | User_5 | 101
1 | User_6 | 101
2 | User_1 | 101
2 | User_2 | 101
2 | User_3 | 101
2 | User_4 | 101
2 | User_5 | 101
2 | User_6 | 101
3 | User_1 | 101
3 | User_2 | 101
3 | User_3 | 101
3 | User_4 | 101
3 | User_5 | 101
3 | User_6 | 101
4 | User_1 | 101
4 | User_2 | 101
(20 rows)
SELECT
max(avg)
FROM
@ -974,10 +1179,15 @@ FROM
GROUP BY user_id
ORDER BY 1 DESC
LIMIT 5;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- UNION with only one subquery which has a partition on non-distribution column should
-- error out
max
---------------------------------------------------------------------
5
3.09090909090909
3
3
2.875
(5 rows)
SELECT *
FROM (
( SELECT user_id,
@ -1011,6 +1221,13 @@ FROM (
user_id)) AS ftop
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
user_id | sum
---------------------------------------------------------------------
5 | 298
6 | 244
1 | 244
4 | 235
2 | 235
(5 rows)
DROP VIEW subq;

View File

@ -747,7 +747,7 @@ RESET citus.subquery_pushdown;
VACUUM ANALYZE users_table;
-- explain tests
EXPLAIN (COSTS FALSE) SELECT user_id FROM recent_selected_users GROUP BY 1 ORDER BY 1;
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
Sort
Sort Key: remote_scan.user_id
@ -758,19 +758,17 @@ EXPLAIN (COSTS FALSE) SELECT user_id FROM recent_selected_users GROUP BY 1 ORDER
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Group Key: users_table.user_id
-> Nested Loop
Join Filter: (users_table.user_id = users_table_1.user_id)
-> Sort
Sort Key: (max(users_table_1."time")) DESC
-> HashAggregate
Group Key: users_table_1.user_id
Filter: (max(users_table_1."time") > '2017-11-23 16:20:33.264457'::timestamp without time zone)
-> Seq Scan on users_table_1400256 users_table_1
-> Seq Scan on users_table_1400256 users_table
Filter: ((value_1 >= 1) AND (value_1 < 3))
(21 rows)
-> Nested Loop
Join Filter: (users_table.user_id = users_table_1.user_id)
-> Sort
Sort Key: (max(users_table_1."time")) DESC
-> HashAggregate
Group Key: users_table_1.user_id
Filter: (max(users_table_1."time") > '2017-11-23 16:20:33.264457'::timestamp without time zone)
-> Seq Scan on users_table_1400256 users_table_1
-> Seq Scan on users_table_1400256 users_table
Filter: ((value_1 >= 1) AND (value_1 < 3))
(19 rows)
EXPLAIN (COSTS FALSE) SELECT *
FROM (

View File

@ -667,7 +667,7 @@ DEBUG: Plan is router executable
2 | 2
(2 rows)
-- set operations works fine with pushdownable window functions
-- set operations work fine with pushdownable window functions
SELECT x, y, rnk FROM (SELECT *, rank() OVER my_win as rnk FROM test WINDOW my_win AS (PARTITION BY x ORDER BY y DESC)) as foo
UNION
SELECT x, y, rnk FROM (SELECT *, rank() OVER my_win as rnk FROM test WINDOW my_win AS (PARTITION BY x ORDER BY y DESC)) as bar
@ -686,14 +686,25 @@ DEBUG: Plan is router executable
1 | 1 | 1
(2 rows)
-- set operations errors out with non-pushdownable window functions
-- set operations work fine with non-pushdownable window functions
SELECT x, y, rnk FROM (SELECT *, rank() OVER my_win as rnk FROM test WINDOW my_win AS (PARTITION BY y ORDER BY x DESC)) as foo
UNION
SELECT x, y, rnk FROM (SELECT *, rank() OVER my_win as rnk FROM test WINDOW my_win AS (PARTITION BY y ORDER BY x DESC)) as bar;
SELECT x, y, rnk FROM (SELECT *, rank() OVER my_win as rnk FROM test WINDOW my_win AS (PARTITION BY y ORDER BY x DESC)) as bar
ORDER BY 1 DESC, 2 DESC, 3 DESC;
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Router planner cannot handle multi-shard select queries
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
DEBUG: generating subplan XXX_1 for subquery SELECT x, y, rank() OVER my_win AS rnk FROM recursive_union.test WINDOW my_win AS (PARTITION BY y ORDER BY x DESC)
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan XXX_2 for subquery SELECT x, y, rank() OVER my_win AS rnk FROM recursive_union.test WINDOW my_win AS (PARTITION BY y ORDER BY x DESC)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.x, foo.y, foo.rnk FROM (SELECT intermediate_result.x, intermediate_result.y, intermediate_result.rnk FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer, rnk bigint)) foo UNION SELECT bar.x, bar.y, bar.rnk FROM (SELECT intermediate_result.x, intermediate_result.y, intermediate_result.rnk FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer, rnk bigint)) bar ORDER BY 1 DESC, 2 DESC, 3 DESC
DEBUG: Creating router plan
DEBUG: Plan is router executable
x | y | rnk
---------------------------------------------------------------------
2 | 2 | 1
1 | 1 | 1
(2 rows)
-- other set operations in joins also cannot be pushed down
SELECT * FROM ((SELECT * FROM test) EXCEPT (SELECT * FROM test ORDER BY x LIMIT 1)) u JOIN test USING (x) ORDER BY 1,2;
DEBUG: Router planner cannot handle multi-shard select queries

View File

@ -2,11 +2,11 @@
-- test recursive planning functionality with subqueries and CTEs
-- ===================================================================
CREATE SCHEMA subquery_deep;
SET search_path TO subquery_and_ctes, public;
SET search_path TO subquery_deep, public;
SET client_min_messages TO DEBUG1;
-- subquery in FROM -> FROM -> FROM should be replaced due to OFFSET
-- one level up subquery should be replaced due to GROUP BY on non partition key
-- one level up subquery should be replaced due to LIMUT
-- one level up subquery should be replaced due to LIMIT
SELECT
DISTINCT user_id
FROM

View File

@ -81,27 +81,6 @@ DEBUG: generating subplan XXX_1 for subquery SELECT DISTINCT users_table.user_i
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo ORDER BY user_id DESC
ERROR: cannot handle complex subqueries when the router executor is disabled
SET citus.enable_router_execution TO true;
-- window functions are not allowed if they're not partitioned on the distribution column
SELECT
*
FROM
(
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS (PARTITION BY event_type ORDER BY time DESC)
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10) as foo;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
-- OUTER JOINs where the outer part is recursively planned and not the other way
-- around is not supported
SELECT

View File

@ -248,8 +248,7 @@ LIMIT 10;
3 | 18
(10 rows)
-- similar query with no distribution column is on the partition by clause
-- is not supported
-- similar query with no distribution column on the partition by clause
SELECT
DISTINCT ON (events_table.user_id, rnk) events_table.user_id, rank() OVER my_win AS rnk
FROM
@ -261,8 +260,20 @@ WINDOW
ORDER BY
rnk DESC, 1 DESC
LIMIT 10;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
user_id | rnk
---------------------------------------------------------------------
3 | 7
2 | 7
3 | 6
2 | 6
4 | 5
3 | 5
2 | 5
1 | 5
6 | 4
5 | 4
(10 rows)
-- ORDER BY in the window function is an aggregate
SELECT
user_id, rank() OVER my_win as rnk, avg(value_2) as avg_val_2
@ -656,6 +667,237 @@ ORDER BY
5 | 5 | {5,5,5} | {5,5}
(66 rows)
-- repeat above 3 tests without grouping by distribution column
SELECT
value_2,
rank() OVER (PARTITION BY value_2 ROWS BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW),
dense_rank() OVER (PARTITION BY value_2 RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW),
CUME_DIST() OVER (PARTITION BY value_2 RANGE BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),
PERCENT_RANK() OVER (PARTITION BY value_2 ORDER BY avg(value_1) RANGE BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM
users_table
GROUP BY
1
ORDER BY
4 DESC,3 DESC,2 DESC ,1 DESC;
value_2 | rank | dense_rank | cume_dist | percent_rank
---------------------------------------------------------------------
5 | 1 | 1 | 1 | 0
4 | 1 | 1 | 1 | 0
3 | 1 | 1 | 1 | 0
2 | 1 | 1 | 1 | 0
1 | 1 | 1 | 1 | 0
0 | 1 | 1 | 1 | 0
(6 rows)
-- test exclude supported
SELECT
value_2,
value_1,
array_agg(value_1) OVER (PARTITION BY value_2 ORDER BY value_1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
array_agg(value_1) OVER (PARTITION BY value_2 ORDER BY value_1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW EXCLUDE CURRENT ROW)
FROM
users_table
WHERE
value_2 > 2 AND value_2 < 6
ORDER BY
value_2, value_1, 3, 4;
value_2 | value_1 | array_agg | array_agg
---------------------------------------------------------------------
3 | 0 | {0,0,0} | {0,0}
3 | 0 | {0,0,0} | {0,0}
3 | 0 | {0,0,0} | {0,0}
3 | 1 | {0,0,0,1,1,1,1} | {0,0,0,1,1,1}
3 | 1 | {0,0,0,1,1,1,1} | {0,0,0,1,1,1}
3 | 1 | {0,0,0,1,1,1,1} | {0,0,0,1,1,1}
3 | 1 | {0,0,0,1,1,1,1} | {0,0,0,1,1,1}
3 | 2 | {0,0,0,1,1,1,1,2,2} | {0,0,0,1,1,1,1,2}
3 | 2 | {0,0,0,1,1,1,1,2,2} | {0,0,0,1,1,1,1,2}
3 | 3 | {0,0,0,1,1,1,1,2,2,3,3} | {0,0,0,1,1,1,1,2,2,3}
3 | 3 | {0,0,0,1,1,1,1,2,2,3,3} | {0,0,0,1,1,1,1,2,2,3}
3 | 4 | {0,0,0,1,1,1,1,2,2,3,3,4,4,4,4,4} | {0,0,0,1,1,1,1,2,2,3,3,4,4,4,4}
3 | 4 | {0,0,0,1,1,1,1,2,2,3,3,4,4,4,4,4} | {0,0,0,1,1,1,1,2,2,3,3,4,4,4,4}
3 | 4 | {0,0,0,1,1,1,1,2,2,3,3,4,4,4,4,4} | {0,0,0,1,1,1,1,2,2,3,3,4,4,4,4}
3 | 4 | {0,0,0,1,1,1,1,2,2,3,3,4,4,4,4,4} | {0,0,0,1,1,1,1,2,2,3,3,4,4,4,4}
3 | 4 | {0,0,0,1,1,1,1,2,2,3,3,4,4,4,4,4} | {0,0,0,1,1,1,1,2,2,3,3,4,4,4,4}
3 | 5 | {0,0,0,1,1,1,1,2,2,3,3,4,4,4,4,4,5} | {0,0,0,1,1,1,1,2,2,3,3,4,4,4,4,4}
4 | 0 | {0,0} | {0}
4 | 0 | {0,0} | {0}
4 | 1 | {0,0,1,1} | {0,0,1}
4 | 1 | {0,0,1,1} | {0,0,1}
4 | 2 | {0,0,1,1,2,2,2} | {0,0,1,1,2,2}
4 | 2 | {0,0,1,1,2,2,2} | {0,0,1,1,2,2}
4 | 2 | {0,0,1,1,2,2,2} | {0,0,1,1,2,2}
4 | 3 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3} | {0,0,1,1,2,2,2,3,3,3,3,3,3}
4 | 3 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3} | {0,0,1,1,2,2,2,3,3,3,3,3,3}
4 | 3 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3} | {0,0,1,1,2,2,2,3,3,3,3,3,3}
4 | 3 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3} | {0,0,1,1,2,2,2,3,3,3,3,3,3}
4 | 3 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3} | {0,0,1,1,2,2,2,3,3,3,3,3,3}
4 | 3 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3} | {0,0,1,1,2,2,2,3,3,3,3,3,3}
4 | 3 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3} | {0,0,1,1,2,2,2,3,3,3,3,3,3}
4 | 4 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3,4,4,4,4} | {0,0,1,1,2,2,2,3,3,3,3,3,3,3,4,4,4}
4 | 4 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3,4,4,4,4} | {0,0,1,1,2,2,2,3,3,3,3,3,3,3,4,4,4}
4 | 4 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3,4,4,4,4} | {0,0,1,1,2,2,2,3,3,3,3,3,3,3,4,4,4}
4 | 4 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3,4,4,4,4} | {0,0,1,1,2,2,2,3,3,3,3,3,3,3,4,4,4}
4 | 5 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3,4,4,4,4,5,5} | {0,0,1,1,2,2,2,3,3,3,3,3,3,3,4,4,4,4,5}
4 | 5 | {0,0,1,1,2,2,2,3,3,3,3,3,3,3,4,4,4,4,5,5} | {0,0,1,1,2,2,2,3,3,3,3,3,3,3,4,4,4,4,5}
5 | 0 | {0,0} | {0}
5 | 0 | {0,0} | {0}
5 | 1 | {0,0,1} | {0,0}
5 | 2 | {0,0,1,2,2} | {0,0,1,2}
5 | 2 | {0,0,1,2,2} | {0,0,1,2}
5 | 3 | {0,0,1,2,2,3,3,3,3} | {0,0,1,2,2,3,3,3}
5 | 3 | {0,0,1,2,2,3,3,3,3} | {0,0,1,2,2,3,3,3}
5 | 3 | {0,0,1,2,2,3,3,3,3} | {0,0,1,2,2,3,3,3}
5 | 3 | {0,0,1,2,2,3,3,3,3} | {0,0,1,2,2,3,3,3}
5 | 4 | {0,0,1,2,2,3,3,3,3,4,4} | {0,0,1,2,2,3,3,3,3,4}
5 | 4 | {0,0,1,2,2,3,3,3,3,4,4} | {0,0,1,2,2,3,3,3,3,4}
5 | 5 | {0,0,1,2,2,3,3,3,3,4,4,5,5} | {0,0,1,2,2,3,3,3,3,4,4,5}
5 | 5 | {0,0,1,2,2,3,3,3,3,4,4,5,5} | {0,0,1,2,2,3,3,3,3,4,4,5}
(50 rows)
-- test <offset> preceding and <offset> following on RANGE window
SELECT
value_2,
value_1,
array_agg(value_1) OVER range_window,
array_agg(value_1) OVER range_window_exclude
FROM
users_table
WHERE
value_2 > 2 AND value_2 < 6
WINDOW
range_window as (PARTITION BY value_2 ORDER BY value_1 RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING),
range_window_exclude as (PARTITION BY value_2 ORDER BY value_1 RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING EXCLUDE CURRENT ROW)
ORDER BY
value_2, value_1, 3, 4;
value_2 | value_1 | array_agg | array_agg
---------------------------------------------------------------------
3 | 0 | {0,0,0,1,1,1,1} | {0,0,1,1,1,1}
3 | 0 | {0,0,0,1,1,1,1} | {0,0,1,1,1,1}
3 | 0 | {0,0,0,1,1,1,1} | {0,0,1,1,1,1}
3 | 1 | {0,0,0,1,1,1,1,2,2} | {0,0,0,1,1,1,2,2}
3 | 1 | {0,0,0,1,1,1,1,2,2} | {0,0,0,1,1,1,2,2}
3 | 1 | {0,0,0,1,1,1,1,2,2} | {0,0,0,1,1,1,2,2}
3 | 1 | {0,0,0,1,1,1,1,2,2} | {0,0,0,1,1,1,2,2}
3 | 2 | {1,1,1,1,2,2,3,3} | {1,1,1,1,2,3,3}
3 | 2 | {1,1,1,1,2,2,3,3} | {1,1,1,1,2,3,3}
3 | 3 | {2,2,3,3,4,4,4,4,4} | {2,2,3,4,4,4,4,4}
3 | 3 | {2,2,3,3,4,4,4,4,4} | {2,2,3,4,4,4,4,4}
3 | 4 | {3,3,4,4,4,4,4,5} | {3,3,4,4,4,4,5}
3 | 4 | {3,3,4,4,4,4,4,5} | {3,3,4,4,4,4,5}
3 | 4 | {3,3,4,4,4,4,4,5} | {3,3,4,4,4,4,5}
3 | 4 | {3,3,4,4,4,4,4,5} | {3,3,4,4,4,4,5}
3 | 4 | {3,3,4,4,4,4,4,5} | {3,3,4,4,4,4,5}
3 | 5 | {4,4,4,4,4,5} | {4,4,4,4,4}
4 | 0 | {0,0,1,1} | {0,1,1}
4 | 0 | {0,0,1,1} | {0,1,1}
4 | 1 | {0,0,1,1,2,2,2} | {0,0,1,2,2,2}
4 | 1 | {0,0,1,1,2,2,2} | {0,0,1,2,2,2}
4 | 2 | {1,1,2,2,2,3,3,3,3,3,3,3} | {1,1,2,2,3,3,3,3,3,3,3}
4 | 2 | {1,1,2,2,2,3,3,3,3,3,3,3} | {1,1,2,2,3,3,3,3,3,3,3}
4 | 2 | {1,1,2,2,2,3,3,3,3,3,3,3} | {1,1,2,2,3,3,3,3,3,3,3}
4 | 3 | {2,2,2,3,3,3,3,3,3,3,4,4,4,4} | {2,2,2,3,3,3,3,3,3,4,4,4,4}
4 | 3 | {2,2,2,3,3,3,3,3,3,3,4,4,4,4} | {2,2,2,3,3,3,3,3,3,4,4,4,4}
4 | 3 | {2,2,2,3,3,3,3,3,3,3,4,4,4,4} | {2,2,2,3,3,3,3,3,3,4,4,4,4}
4 | 3 | {2,2,2,3,3,3,3,3,3,3,4,4,4,4} | {2,2,2,3,3,3,3,3,3,4,4,4,4}
4 | 3 | {2,2,2,3,3,3,3,3,3,3,4,4,4,4} | {2,2,2,3,3,3,3,3,3,4,4,4,4}
4 | 3 | {2,2,2,3,3,3,3,3,3,3,4,4,4,4} | {2,2,2,3,3,3,3,3,3,4,4,4,4}
4 | 3 | {2,2,2,3,3,3,3,3,3,3,4,4,4,4} | {2,2,2,3,3,3,3,3,3,4,4,4,4}
4 | 4 | {3,3,3,3,3,3,3,4,4,4,4,5,5} | {3,3,3,3,3,3,3,4,4,4,5,5}
4 | 4 | {3,3,3,3,3,3,3,4,4,4,4,5,5} | {3,3,3,3,3,3,3,4,4,4,5,5}
4 | 4 | {3,3,3,3,3,3,3,4,4,4,4,5,5} | {3,3,3,3,3,3,3,4,4,4,5,5}
4 | 4 | {3,3,3,3,3,3,3,4,4,4,4,5,5} | {3,3,3,3,3,3,3,4,4,4,5,5}
4 | 5 | {4,4,4,4,5,5} | {4,4,4,4,5}
4 | 5 | {4,4,4,4,5,5} | {4,4,4,4,5}
5 | 0 | {0,0,1} | {0,1}
5 | 0 | {0,0,1} | {0,1}
5 | 1 | {0,0,1,2,2} | {0,0,2,2}
5 | 2 | {1,2,2,3,3,3,3} | {1,2,3,3,3,3}
5 | 2 | {1,2,2,3,3,3,3} | {1,2,3,3,3,3}
5 | 3 | {2,2,3,3,3,3,4,4} | {2,2,3,3,3,4,4}
5 | 3 | {2,2,3,3,3,3,4,4} | {2,2,3,3,3,4,4}
5 | 3 | {2,2,3,3,3,3,4,4} | {2,2,3,3,3,4,4}
5 | 3 | {2,2,3,3,3,3,4,4} | {2,2,3,3,3,4,4}
5 | 4 | {3,3,3,3,4,4,5,5} | {3,3,3,3,4,5,5}
5 | 4 | {3,3,3,3,4,4,5,5} | {3,3,3,3,4,5,5}
5 | 5 | {4,4,5,5} | {4,4,5}
5 | 5 | {4,4,5,5} | {4,4,5}
(50 rows)
-- test <offset> preceding and <offset> following on ROW window
SELECT
value_2,
value_1,
array_agg(value_1) OVER row_window,
array_agg(value_1) OVER row_window_exclude
FROM
users_table
WHERE
value_2 > 2 and value_2 < 6
WINDOW
row_window as (PARTITION BY value_2 ORDER BY value_1 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
row_window_exclude as (PARTITION BY value_2 ORDER BY value_1 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING EXCLUDE CURRENT ROW)
ORDER BY
value_2, value_1, 3, 4;
value_2 | value_1 | array_agg | array_agg
---------------------------------------------------------------------
3 | 0 | {0,0} | {0}
3 | 0 | {0,0,0} | {0,0}
3 | 0 | {0,0,1} | {0,1}
3 | 1 | {0,1,1} | {0,1}
3 | 1 | {1,1,1} | {1,1}
3 | 1 | {1,1,1} | {1,1}
3 | 1 | {1,1,2} | {1,2}
3 | 2 | {1,2,2} | {1,2}
3 | 2 | {2,2,3} | {2,3}
3 | 3 | {2,3,3} | {2,3}
3 | 3 | {3,3,4} | {3,4}
3 | 4 | {3,4,4} | {3,4}
3 | 4 | {4,4,4} | {4,4}
3 | 4 | {4,4,4} | {4,4}
3 | 4 | {4,4,4} | {4,4}
3 | 4 | {4,4,5} | {4,5}
3 | 5 | {4,5} | {4}
4 | 0 | {0,0} | {0}
4 | 0 | {0,0,1} | {0,1}
4 | 1 | {0,1,1} | {0,1}
4 | 1 | {1,1,2} | {1,2}
4 | 2 | {1,2,2} | {1,2}
4 | 2 | {2,2,2} | {2,2}
4 | 2 | {2,2,3} | {2,3}
4 | 3 | {2,3,3} | {2,3}
4 | 3 | {3,3,3} | {3,3}
4 | 3 | {3,3,3} | {3,3}
4 | 3 | {3,3,3} | {3,3}
4 | 3 | {3,3,3} | {3,3}
4 | 3 | {3,3,3} | {3,3}
4 | 3 | {3,3,4} | {3,4}
4 | 4 | {3,4,4} | {3,4}
4 | 4 | {4,4,4} | {4,4}
4 | 4 | {4,4,4} | {4,4}
4 | 4 | {4,4,5} | {4,5}
4 | 5 | {4,5,5} | {4,5}
4 | 5 | {5,5} | {5}
5 | 0 | {0,0} | {0}
5 | 0 | {0,0,1} | {0,1}
5 | 1 | {0,1,2} | {0,2}
5 | 2 | {1,2,2} | {1,2}
5 | 2 | {2,2,3} | {2,3}
5 | 3 | {2,3,3} | {2,3}
5 | 3 | {3,3,3} | {3,3}
5 | 3 | {3,3,3} | {3,3}
5 | 3 | {3,3,4} | {3,4}
5 | 4 | {3,4,4} | {3,4}
5 | 4 | {4,4,5} | {4,5}
5 | 5 | {4,5,5} | {4,5}
5 | 5 | {5,5} | {5}
(50 rows)
-- some tests with GROUP BY, HAVING and LIMIT
SELECT
user_id, sum(event_type) OVER my_win , event_type
@ -678,6 +920,27 @@ LIMIT
5 | 3 | 3
(5 rows)
-- test PARTITION BY avg(...) ORDER BY avg(...)
SELECT
value_1,
avg(value_3),
dense_rank() OVER (PARTITION BY avg(value_3) ORDER BY avg(value_2))
FROM
users_table
GROUP BY
1
ORDER BY
1;
value_1 | avg | dense_rank
---------------------------------------------------------------------
0 | 3.08333333333333 | 1
1 | 2.93333333333333 | 1
2 | 2.22222222222222 | 1
3 | 2.73076923076923 | 1
4 | 2.9047619047619 | 1
5 | 2.22222222222222 | 2
(6 rows)
-- Group by has more columns than partition by
SELECT
DISTINCT user_id, SUM(value_2) OVER (PARTITION BY user_id)
@ -768,10 +1031,89 @@ $Q$);
---------------------------------------------------------------------
Sort
Sort Key: remote_scan.avg_1 DESC, remote_scan.avg DESC, remote_scan.user_id DESC
-> HashAggregate
Group Key: remote_scan.user_id
-> Custom Scan (Citus Adaptive)
Task Count: 4
-> Custom Scan (Citus Adaptive)
Task Count: 4
(4 rows)
SELECT
value_2,
AVG(avg(value_1)) OVER (PARTITION BY value_2, max(value_2), MIN(value_2)),
AVG(avg(value_2)) OVER (PARTITION BY value_2, min(value_2), AVG(value_1))
FROM
users_table
GROUP BY
1
ORDER BY
3 DESC, 2 DESC, 1 DESC;
value_2 | avg | avg
---------------------------------------------------------------------
5 | 2.6923076923076923 | 5.0000000000000000
4 | 2.7500000000000000 | 4.0000000000000000
3 | 2.2941176470588235 | 3.0000000000000000
2 | 2.7619047619047619 | 2.0000000000000000
1 | 2.4285714285714286 | 1.00000000000000000000
0 | 2.2222222222222222 | 0.00000000000000000000
(6 rows)
SELECT
value_2, user_id,
AVG(avg(value_1)) OVER (PARTITION BY value_2, max(value_2), MIN(value_2)),
AVG(avg(value_2)) OVER (PARTITION BY user_id, min(value_2), AVG(value_1))
FROM
users_table
GROUP BY
1, 2
ORDER BY
3 DESC, 2 DESC, 1 DESC;
value_2 | user_id | avg | avg
---------------------------------------------------------------------
5 | 5 | 2.6666666666666667 | 5.0000000000000000
5 | 4 | 2.6666666666666667 | 5.0000000000000000
5 | 3 | 2.6666666666666667 | 5.0000000000000000
5 | 2 | 2.6666666666666667 | 5.0000000000000000
2 | 6 | 2.54583333333333333333 | 2.0000000000000000
2 | 5 | 2.54583333333333333333 | 2.0000000000000000
2 | 4 | 2.54583333333333333333 | 2.0000000000000000
2 | 3 | 2.54583333333333333333 | 2.0000000000000000
2 | 2 | 2.54583333333333333333 | 2.0000000000000000
2 | 1 | 2.54583333333333333333 | 2.0000000000000000
0 | 6 | 2.50000000000000000000 | 0.00000000000000000000
0 | 5 | 2.50000000000000000000 | 0.00000000000000000000
0 | 4 | 2.50000000000000000000 | 0.00000000000000000000
0 | 2 | 2.50000000000000000000 | 0.00000000000000000000
0 | 1 | 2.50000000000000000000 | 0.00000000000000000000
4 | 6 | 2.45555555555555555000 | 4.0000000000000000
4 | 5 | 2.45555555555555555000 | 4.0000000000000000
4 | 4 | 2.45555555555555555000 | 4.0000000000000000
4 | 3 | 2.45555555555555555000 | 4.0000000000000000
4 | 2 | 2.45555555555555555000 | 4.0000000000000000
4 | 1 | 2.45555555555555555000 | 4.0000000000000000
3 | 6 | 2.3500000000000000 | 3.0000000000000000
3 | 5 | 2.3500000000000000 | 3.0000000000000000
3 | 4 | 2.3500000000000000 | 3.0000000000000000
3 | 3 | 2.3500000000000000 | 3.0000000000000000
3 | 2 | 2.3500000000000000 | 3.0000000000000000
3 | 1 | 2.3500000000000000 | 3.0000000000000000
1 | 6 | 1.90666666666666666000 | 1.00000000000000000000
1 | 5 | 1.90666666666666666000 | 1.00000000000000000000
1 | 4 | 1.90666666666666666000 | 1.00000000000000000000
1 | 3 | 1.90666666666666666000 | 1.00000000000000000000
1 | 2 | 1.90666666666666666000 | 1.00000000000000000000
(32 rows)
SELECT user_id, sum(avg(user_id)) OVER ()
FROM users_table
GROUP BY user_id
ORDER BY 1
LIMIT 10;
user_id | sum
---------------------------------------------------------------------
1 | 21.00000000000000000000
2 | 21.00000000000000000000
3 | 21.00000000000000000000
4 | 21.00000000000000000000
5 | 21.00000000000000000000
6 | 21.00000000000000000000
(6 rows)
SELECT
@ -939,24 +1281,22 @@ FROM
users_table
GROUP BY user_id, value_2
ORDER BY user_id, avg(value_1) DESC;
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
Sort
Sort Key: remote_scan.user_id, ((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1))) DESC
-> HashAggregate
Group Key: remote_scan.user_id, remote_scan.worker_column_5
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> WindowAgg
-> Sort
Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1))))
-> HashAggregate
Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table
(15 rows)
Sort Key: remote_scan.user_id, remote_scan.avg DESC
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> WindowAgg
-> Sort
Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1))))
-> HashAggregate
Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table
(13 rows)
-- order by in the window function is same as avg(value_1) DESC
SELECT
@ -1014,25 +1354,23 @@ FROM
GROUP BY user_id, value_2
ORDER BY user_id, avg(value_1) DESC
LIMIT 5;
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
Limit
-> Sort
Sort Key: remote_scan.user_id, ((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1))) DESC
-> HashAggregate
Group Key: remote_scan.user_id, remote_scan.worker_column_5
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> WindowAgg
-> Sort
Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1))))
-> HashAggregate
Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table
(16 rows)
Sort Key: remote_scan.user_id, remote_scan.avg DESC
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> WindowAgg
-> Sort
Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1))))
-> HashAggregate
Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table
(14 rows)
EXPLAIN (COSTS FALSE)
SELECT
@ -1044,25 +1382,23 @@ FROM
GROUP BY user_id, value_2
ORDER BY user_id, avg(value_1) DESC
LIMIT 5;
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
Limit
-> Sort
Sort Key: remote_scan.user_id, ((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1))) DESC
-> HashAggregate
Group Key: remote_scan.user_id, remote_scan.worker_column_5
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> WindowAgg
-> Sort
Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1))))
-> HashAggregate
Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table
(16 rows)
Sort Key: remote_scan.user_id, remote_scan.avg DESC
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> WindowAgg
-> Sort
Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1))))
-> HashAggregate
Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table
(14 rows)
EXPLAIN (COSTS FALSE)
SELECT
@ -1074,25 +1410,23 @@ FROM
GROUP BY user_id, value_2
ORDER BY user_id, avg(value_1) DESC
LIMIT 5;
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
Limit
-> Sort
Sort Key: remote_scan.user_id, ((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1))) DESC
-> HashAggregate
Group Key: remote_scan.user_id, remote_scan.worker_column_5
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> WindowAgg
-> Sort
Sort Key: users_table.user_id, ((1 / (1 + sum(users_table.value_2))))
-> HashAggregate
Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table
(16 rows)
Sort Key: remote_scan.user_id, remote_scan.avg DESC
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> WindowAgg
-> Sort
Sort Key: users_table.user_id, ((1 / (1 + sum(users_table.value_2))))
-> HashAggregate
Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table
(14 rows)
EXPLAIN (COSTS FALSE)
SELECT
@ -1104,23 +1438,117 @@ FROM
GROUP BY user_id, value_2
ORDER BY user_id, avg(value_1) DESC
LIMIT 5;
QUERY PLAN
QUERY PLAN
---------------------------------------------------------------------
Limit
-> Sort
Sort Key: remote_scan.user_id, ((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1))) DESC
-> HashAggregate
Group Key: remote_scan.user_id, remote_scan.worker_column_5
Sort Key: remote_scan.user_id, remote_scan.avg DESC
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> WindowAgg
-> Sort
Sort Key: users_table.user_id, (sum(users_table.value_2))
-> HashAggregate
Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table
(14 rows)
-- Grouping can be pushed down with aggregates even when window function can't
EXPLAIN (COSTS FALSE)
SELECT user_id, count(value_1), stddev(value_1), count(user_id) OVER (PARTITION BY random())
FROM users_table GROUP BY user_id HAVING avg(value_1) > 2 LIMIT 1;
QUERY PLAN
---------------------------------------------------------------------
Limit
-> WindowAgg
-> Sort
Sort Key: remote_scan.worker_column_5
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> WindowAgg
-> Sort
Sort Key: users_table.user_id, (sum(users_table.value_2))
-> HashAggregate
Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table
(16 rows)
-> HashAggregate
Group Key: user_id
Filter: (avg(value_1) > '2'::numeric)
-> Seq Scan on users_table_1400256 users_table
(13 rows)
-- Window function with inlined CTE
WITH cte as (
SELECT uref.id user_id, events_table.value_2, count(*) c
FROM events_table
JOIN users_ref_test_table uref ON uref.id = events_table.user_id
GROUP BY 1, 2
)
SELECT DISTINCT cte.value_2, cte.c, sum(cte.value_2) OVER (PARTITION BY cte.c)
FROM cte JOIN events_table et ON et.value_2 = cte.value_2 and et.value_2 = cte.c
ORDER BY 1;
value_2 | c | sum
---------------------------------------------------------------------
3 | 3 | 108
4 | 4 | 56
(2 rows)
-- There was a strange bug where this wouldn't have window functions being pushed down
-- Bug dependent on column ordering
CREATE TABLE daily_uniques (value_2 float, user_id bigint);
SELECT create_distributed_table('daily_uniques', 'user_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
EXPLAIN (COSTS FALSE) SELECT
user_id,
sum(value_2) AS commits,
RANK () OVER (
PARTITION BY user_id
ORDER BY
sum(value_2) DESC
)
FROM daily_uniques
GROUP BY user_id
HAVING
sum(value_2) > 0
ORDER BY commits DESC
LIMIT 10;
QUERY PLAN
---------------------------------------------------------------------
Limit
-> Sort
Sort Key: remote_scan.commits DESC
-> Custom Scan (Citus Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> WindowAgg
-> Sort
Sort Key: daily_uniques.user_id, (sum(daily_uniques.value_2)) DESC
-> HashAggregate
Group Key: daily_uniques.user_id
Filter: (sum(daily_uniques.value_2) > '0'::double precision)
-> Seq Scan on daily_uniques_xxxxxxx daily_uniques
(15 rows)
DROP TABLE daily_uniques;
-- Partition by reference table column joined to distribution column
SELECT DISTINCT value_2, array_agg(rnk ORDER BY rnk) FROM (
SELECT events_table.value_2, sum(uref.k_no) OVER (PARTITION BY uref.id) AS rnk
FROM events_table
JOIN users_ref_test_table uref ON uref.id = events_table.user_id) sq
GROUP BY 1 ORDER BY 1;
value_2 | array_agg
---------------------------------------------------------------------
0 | {686,686,816,816,987,987,1104}
1 | {500,500,675,675,675,686,686,816,816,816,987,987,987,987,987,1104,1104,1104,1104,1104,1104,1104}
2 | {500,500,500,500,675,675,675,675,675,686,686,686,686,816,816,816,816,816,987,987,987,987,987,987,987,1104,1104,1104,1104,1104,1104}
3 | {500,500,500,500,675,686,686,686,816,816,987,987,987,1104,1104,1104,1104,1104}
4 | {675,675,675,675,686,686,686,816,816,816,987,987,1104,1104}
5 | {675,675,816,816,987,987,1104,1104,1104}
(6 rows)

View File

@ -461,8 +461,15 @@ ORDER BY
user_id
LIMIT
5;
ERROR: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways. Either add an equality filter on the distributed tables' partition column or use the window functions with a PARTITION BY clause containing the distribution column
user_id
---------------------------------------------------------------------
6
6
6
6
6
(5 rows)
-- Window functions that partition by the distribution column in subqueries in CTEs are ok
WITH top_users AS
(SELECT *

View File

@ -60,14 +60,15 @@ WHERE day >= '2018-06-23' AND day <= '2018-07-01'
GROUP BY 1
ORDER BY 1;
-- These are going to be supported after window function support
SELECT day, hll_cardinality(hll_union_agg(unique_users) OVER seven_days)
FROM daily_uniques
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING);
WINDOW seven_days AS (ORDER BY day ASC ROWS 6 PRECEDING)
ORDER BY 1;
SELECT day, (hll_cardinality(hll_union_agg(unique_users) OVER two_days)) - hll_cardinality(unique_users) AS lost_uniques
FROM daily_uniques
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING);
WINDOW two_days AS (ORDER BY day ASC ROWS 1 PRECEDING)
ORDER BY 1;
-- Test disabling hash_agg on coordinator query
SET citus.explain_all_tasks to true;
@ -219,7 +220,6 @@ SELECT (topn(topn_add_agg(user_id::text), 10)).*
FROM customer_reviews
ORDER BY 2 DESC, 1;
-- The following is going to be supported after window function support
SELECT day, (topn(agg, 10)).*
FROM (
SELECT day, topn_union_agg(reviewers) OVER seven_days AS agg

View File

@ -181,6 +181,30 @@ ORDER BY
customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20;
SELECT
customer_keys.o_custkey,
SUM(order_count) AS total_order_count
FROM
(SELECT o_custkey, o_orderstatus, COUNT(*) over (partition by o_orderstatus) AS order_count
FROM orders GROUP BY o_custkey, o_orderstatus ) customer_keys
GROUP BY
customer_keys.o_custkey
ORDER BY
customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20;
SELECT
customer_keys.o_custkey,
SUM(order_count1 + order_count) AS total_order_count
FROM
(SELECT o_custkey, o_orderstatus, count(*) order_count1, COUNT(*) over (partition by o_orderstatus) AS order_count
FROM orders GROUP BY o_custkey, o_orderstatus ) customer_keys
GROUP BY
customer_keys.o_custkey
ORDER BY
customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20;
RESET citus.task_executor_type;
SET client_min_messages TO DEBUG1;

View File

@ -561,7 +561,7 @@ FROM (
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results_window;
TRUNCATE agg_results_window;
-- lets have some queries that Citus shouldn't push down
-- test queries where the window function isn't pushed down
INSERT INTO agg_results_window (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
@ -578,8 +578,6 @@ ORDER BY
LIMIT
10;
-- user needs to supply partition by which should
-- include the distribution key
INSERT INTO agg_results_window (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
@ -596,8 +594,6 @@ ORDER BY
LIMIT
10;
-- user needs to supply partition by which should
-- include the distribution key
INSERT INTO agg_results_window (user_id, agg_time, value_2_agg)
SELECT
user_id, time, rnk
@ -614,7 +610,6 @@ ORDER BY
LIMIT
10;
-- w2 should not be pushed down
INSERT INTO agg_results_window (user_id, value_1_agg, value_2_agg)
SELECT * FROM
(
@ -631,7 +626,6 @@ SELECT * FROM
LIMIT
10;
-- GROUP BY includes the partition key, but not the WINDOW function
INSERT INTO agg_results_window (user_id, agg_time, value_2_agg)
SELECT
user_id, time, my_rank
@ -648,7 +642,6 @@ FROM
WHERE
my_rank > 125;
-- GROUP BY includes the partition key, but not the WINDOW function
INSERT INTO agg_results_window (user_id, agg_time, value_2_agg)
SELECT
user_id, time, my_rank
@ -665,7 +658,6 @@ FROM
WHERE
my_rank > 125;
-- w2 should not be allowed
INSERT INTO agg_results_window (user_id, value_2_agg, value_3_agg)
SELECT * FROM
(
@ -680,7 +672,6 @@ SELECT * FROM
w2 AS (ORDER BY events_table.time)
) as foo;
-- unsupported window function with an override
INSERT INTO agg_results_window(user_id, agg_time, value_2_agg)
SELECT * FROM (
SELECT
@ -697,7 +688,6 @@ SELECT * FROM (
w2 as (PARTITION BY user_id, time)
) a;
-- Subquery in where with unsupported window function
INSERT INTO agg_results_window(user_id)
SELECT
user_id
@ -716,7 +706,6 @@ WHERE
GROUP BY
user_id;
-- Aggregate function on distribution column should error out
INSERT INTO agg_results_window(user_id, value_2_agg)
SELECT * FROM (
SELECT
@ -727,8 +716,6 @@ SELECT * FROM (
user_id
) a;
-- UNION with only one subquery which has a partition on non-distribution column should
-- error out
INSERT INTO agg_results_window(user_id, value_1_agg)
SELECT *
FROM (

View File

@ -501,14 +501,15 @@ SELECT word_count, rank() OVER (PARTITION BY author_id ORDER BY word_count)
FROM articles_hash_mx
WHERE author_id = 1;
-- window functions are not supported for not router plannable queries
SELECT id, MIN(id) over (order by word_count)
FROM articles_hash_mx
WHERE author_id = 1 or author_id = 2;
WHERE author_id = 1 or author_id = 2
ORDER BY 1;
SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
FROM articles_hash_mx
WHERE author_id = 5 or author_id = 2;
WHERE author_id = 5 or author_id = 2
ORDER BY 2;
-- complex query hitting a single shard
SELECT

View File

@ -572,7 +572,6 @@ SELECT *
FROM articles_hash
WHERE (title like '%s' or title like 'a%') and (author_id = 1) and (word_count < 3000 or word_count > 8000);
-- window functions are supported if query is router plannable
SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
FROM articles_hash
WHERE author_id = 5;
@ -594,14 +593,15 @@ SELECT word_count, rank() OVER (PARTITION BY author_id ORDER BY word_count)
FROM articles_hash
WHERE author_id = 1;
-- window functions are not supported for not router plannable queries
SELECT id, MIN(id) over (order by word_count)
FROM articles_hash
WHERE author_id = 1 or author_id = 2;
WHERE author_id = 1 or author_id = 2
ORDER BY 1;
SELECT LAG(title, 1) over (ORDER BY word_count) prev, title, word_count
FROM articles_hash
WHERE author_id = 5 or author_id = 2;
WHERE author_id = 5 or author_id = 2
ORDER BY 2;
-- where false queries are router plannable
SELECT *

View File

@ -396,13 +396,13 @@ EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (l_orderkey) l_orderkey, l_partkey, l_suppkey
FROM lineitem_hash_part
WHERE l_orderkey < 35
ORDER BY 1;
ORDER BY 1, 2, 3;
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (l_orderkey) l_orderkey, l_partkey, l_suppkey
FROM lineitem_hash_part
WHERE l_orderkey < 35
ORDER BY 1;
ORDER BY 1, 2, 3;
-- distinct on non-partition column
-- note order by is required here

View File

@ -476,7 +476,7 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE)
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
-- lets have some queries that Citus shouldn't push down
-- test with window functions which aren't pushed down
SELECT
user_id, time, rnk
FROM
@ -492,8 +492,6 @@ ORDER BY
LIMIT
10;
-- user needs to supply partition by which should
-- include the distribution key
SELECT
user_id, time, rnk
FROM
@ -509,8 +507,6 @@ ORDER BY
LIMIT
10;
-- user needs to supply partition by which should
-- include the distribution key
SELECT
user_id, time, rnk
FROM
@ -526,7 +522,6 @@ ORDER BY
LIMIT
10;
-- w2 should not be pushed down
SELECT * FROM
(
SELECT
@ -542,7 +537,6 @@ SELECT * FROM
ORDER BY 3 DESC, 1 DESC, 2 DESC NULLS LAST
LIMIT 10;
-- w2 should not be pushed down
SELECT * FROM
(
SELECT
@ -560,7 +554,6 @@ ORDER BY
LIMIT
10;
-- GROUP BY includes the partition key, but not the WINDOW function
SELECT
user_id, time, my_rank
FROM
@ -580,7 +573,6 @@ ORDER BY
LIMIT
10;
-- GROUP BY includes the partition key, but not the WINDOW function
SELECT
user_id, time, my_rank
FROM
@ -600,7 +592,6 @@ ORDER BY
LIMIT
10;
-- Overriding window function but not supported
SELECT * FROM (
SELECT
user_id, date_trunc('day', time) as time, sum(rank) OVER w2
@ -619,7 +610,6 @@ ORDER BY
1,2,3;
-- Aggregate function on distribution column should error out
SELECT * FROM (
SELECT
user_id, COUNT(*) OVER (PARTITION BY sum(user_id), MIN(value_2))
@ -645,8 +635,6 @@ ORDER BY
LIMIT
20;
-- UNION ALL with only one of them is not partitioned over distribution column which
-- should not be allowed.
SELECT
max(avg)
FROM
@ -667,8 +655,6 @@ GROUP BY user_id
ORDER BY 1 DESC
LIMIT 5;
-- UNION with only one subquery which has a partition on non-distribution column should
-- error out
SELECT *
FROM (
( SELECT user_id,

View File

@ -104,16 +104,17 @@ SELECT * FROM ((SELECT x,y FROM test) UNION (SELECT y,x FROM test)) foo WHERE x
-- set operations and the sublink can be recursively planned
SELECT * FROM ((SELECT x,y FROM test) UNION (SELECT y,x FROM test)) foo WHERE x IN (SELECT y FROM test) ORDER BY 1;
-- set operations works fine with pushdownable window functions
-- set operations work fine with pushdownable window functions
SELECT x, y, rnk FROM (SELECT *, rank() OVER my_win as rnk FROM test WINDOW my_win AS (PARTITION BY x ORDER BY y DESC)) as foo
UNION
SELECT x, y, rnk FROM (SELECT *, rank() OVER my_win as rnk FROM test WINDOW my_win AS (PARTITION BY x ORDER BY y DESC)) as bar
ORDER BY 1 DESC, 2 DESC, 3 DESC;
-- set operations errors out with non-pushdownable window functions
-- set operations work fine with non-pushdownable window functions
SELECT x, y, rnk FROM (SELECT *, rank() OVER my_win as rnk FROM test WINDOW my_win AS (PARTITION BY y ORDER BY x DESC)) as foo
UNION
SELECT x, y, rnk FROM (SELECT *, rank() OVER my_win as rnk FROM test WINDOW my_win AS (PARTITION BY y ORDER BY x DESC)) as bar;
SELECT x, y, rnk FROM (SELECT *, rank() OVER my_win as rnk FROM test WINDOW my_win AS (PARTITION BY y ORDER BY x DESC)) as bar
ORDER BY 1 DESC, 2 DESC, 3 DESC;
-- other set operations in joins also cannot be pushed down
SELECT * FROM ((SELECT * FROM test) EXCEPT (SELECT * FROM test ORDER BY x LIMIT 1)) u JOIN test USING (x) ORDER BY 1,2;

View File

@ -2,12 +2,12 @@
-- test recursive planning functionality with subqueries and CTEs
-- ===================================================================
CREATE SCHEMA subquery_deep;
SET search_path TO subquery_and_ctes, public;
SET search_path TO subquery_deep, public;
SET client_min_messages TO DEBUG1;
-- subquery in FROM -> FROM -> FROM should be replaced due to OFFSET
-- one level up subquery should be replaced due to GROUP BY on non partition key
-- one level up subquery should be replaced due to LIMUT
-- one level up subquery should be replaced due to LIMIT
SELECT
DISTINCT user_id
FROM

View File

@ -79,27 +79,6 @@ FROM
ORDER BY 1 DESC;
SET citus.enable_router_execution TO true;
-- window functions are not allowed if they're not partitioned on the distribution column
SELECT
*
FROM
(
SELECT
user_id, time, rnk
FROM
(
SELECT
*, rank() OVER my_win as rnk
FROM
events_table
WINDOW my_win AS (PARTITION BY event_type ORDER BY time DESC)
) as foo
ORDER BY
3 DESC, 1 DESC, 2 DESC
LIMIT
10) as foo;
-- OUTER JOINs where the outer part is recursively planned and not the other way
-- around is not supported
SELECT

View File

@ -136,8 +136,7 @@ ORDER BY
rnk DESC, 1 DESC
LIMIT 10;
-- similar query with no distribution column is on the partition by clause
-- is not supported
-- similar query with no distribution column on the partition by clause
SELECT
DISTINCT ON (events_table.user_id, rnk) events_table.user_id, rank() OVER my_win AS rnk
FROM
@ -270,6 +269,69 @@ WINDOW
ORDER BY
user_id, value_1, 3, 4;
-- repeat above 3 tests without grouping by distribution column
SELECT
value_2,
rank() OVER (PARTITION BY value_2 ROWS BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW),
dense_rank() OVER (PARTITION BY value_2 RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW),
CUME_DIST() OVER (PARTITION BY value_2 RANGE BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),
PERCENT_RANK() OVER (PARTITION BY value_2 ORDER BY avg(value_1) RANGE BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM
users_table
GROUP BY
1
ORDER BY
4 DESC,3 DESC,2 DESC ,1 DESC;
-- test exclude supported
SELECT
value_2,
value_1,
array_agg(value_1) OVER (PARTITION BY value_2 ORDER BY value_1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
array_agg(value_1) OVER (PARTITION BY value_2 ORDER BY value_1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW EXCLUDE CURRENT ROW)
FROM
users_table
WHERE
value_2 > 2 AND value_2 < 6
ORDER BY
value_2, value_1, 3, 4;
-- test <offset> preceding and <offset> following on RANGE window
SELECT
value_2,
value_1,
array_agg(value_1) OVER range_window,
array_agg(value_1) OVER range_window_exclude
FROM
users_table
WHERE
value_2 > 2 AND value_2 < 6
WINDOW
range_window as (PARTITION BY value_2 ORDER BY value_1 RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING),
range_window_exclude as (PARTITION BY value_2 ORDER BY value_1 RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING EXCLUDE CURRENT ROW)
ORDER BY
value_2, value_1, 3, 4;
-- test <offset> preceding and <offset> following on ROW window
SELECT
value_2,
value_1,
array_agg(value_1) OVER row_window,
array_agg(value_1) OVER row_window_exclude
FROM
users_table
WHERE
value_2 > 2 and value_2 < 6
WINDOW
row_window as (PARTITION BY value_2 ORDER BY value_1 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
row_window_exclude as (PARTITION BY value_2 ORDER BY value_1 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING EXCLUDE CURRENT ROW)
ORDER BY
value_2, value_1, 3, 4;
-- some tests with GROUP BY, HAVING and LIMIT
SELECT
user_id, sum(event_type) OVER my_win , event_type
@ -284,6 +346,18 @@ ORDER BY
LIMIT
5;
-- test PARTITION BY avg(...) ORDER BY avg(...)
SELECT
value_1,
avg(value_3),
dense_rank() OVER (PARTITION BY avg(value_3) ORDER BY avg(value_2))
FROM
users_table
GROUP BY
1
ORDER BY
1;
-- Group by has more columns than partition by
SELECT
DISTINCT user_id, SUM(value_2) OVER (PARTITION BY user_id)
@ -347,6 +421,34 @@ ORDER BY
3 DESC, 2 DESC, 1 DESC;
$Q$);
SELECT
value_2,
AVG(avg(value_1)) OVER (PARTITION BY value_2, max(value_2), MIN(value_2)),
AVG(avg(value_2)) OVER (PARTITION BY value_2, min(value_2), AVG(value_1))
FROM
users_table
GROUP BY
1
ORDER BY
3 DESC, 2 DESC, 1 DESC;
SELECT
value_2, user_id,
AVG(avg(value_1)) OVER (PARTITION BY value_2, max(value_2), MIN(value_2)),
AVG(avg(value_2)) OVER (PARTITION BY user_id, min(value_2), AVG(value_1))
FROM
users_table
GROUP BY
1, 2
ORDER BY
3 DESC, 2 DESC, 1 DESC;
SELECT user_id, sum(avg(user_id)) OVER ()
FROM users_table
GROUP BY user_id
ORDER BY 1
LIMIT 10;
SELECT
user_id,
1 + sum(value_1),
@ -454,3 +556,49 @@ FROM
GROUP BY user_id, value_2
ORDER BY user_id, avg(value_1) DESC
LIMIT 5;
-- Grouping can be pushed down with aggregates even when window function can't
EXPLAIN (COSTS FALSE)
SELECT user_id, count(value_1), stddev(value_1), count(user_id) OVER (PARTITION BY random())
FROM users_table GROUP BY user_id HAVING avg(value_1) > 2 LIMIT 1;
-- Window function with inlined CTE
WITH cte as (
SELECT uref.id user_id, events_table.value_2, count(*) c
FROM events_table
JOIN users_ref_test_table uref ON uref.id = events_table.user_id
GROUP BY 1, 2
)
SELECT DISTINCT cte.value_2, cte.c, sum(cte.value_2) OVER (PARTITION BY cte.c)
FROM cte JOIN events_table et ON et.value_2 = cte.value_2 and et.value_2 = cte.c
ORDER BY 1;
-- There was a strange bug where this wouldn't have window functions being pushed down
-- Bug dependent on column ordering
CREATE TABLE daily_uniques (value_2 float, user_id bigint);
SELECT create_distributed_table('daily_uniques', 'user_id');
EXPLAIN (COSTS FALSE) SELECT
user_id,
sum(value_2) AS commits,
RANK () OVER (
PARTITION BY user_id
ORDER BY
sum(value_2) DESC
)
FROM daily_uniques
GROUP BY user_id
HAVING
sum(value_2) > 0
ORDER BY commits DESC
LIMIT 10;
DROP TABLE daily_uniques;
-- Partition by reference table column joined to distribution column
SELECT DISTINCT value_2, array_agg(rnk ORDER BY rnk) FROM (
SELECT events_table.value_2, sum(uref.k_no) OVER (PARTITION BY uref.id) AS rnk
FROM events_table
JOIN users_ref_test_table uref ON uref.id = events_table.user_id) sq
GROUP BY 1 ORDER BY 1;