Add select distinct, and distinct on support

Distinct, and distinct on() clauses are supported
in simple selects, joins, subqueries, and insert into select
queries.
pull/1701/head
Murat Tuncer 2017-10-04 13:54:30 +03:00 committed by Murat Tuncer
parent 6879f92e23
commit f7ab901766
18 changed files with 1689 additions and 34 deletions

View File

@ -30,6 +30,7 @@
#include "optimizer/clauses.h"
#include "optimizer/planner.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/tlist.h"
#include "optimizer/var.h"
#include "parser/parsetree.h"
#include "parser/parse_coerce.h"
@ -53,6 +54,7 @@ static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree,
RangeTblEntry *subqueryRte,
bool allReferenceTables);
static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query);
static bool HasUnsupportedDistinctOn(Query *query);
static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
RangeTblEntry *insertRte,
RangeTblEntry *
@ -777,6 +779,7 @@ MultiTaskRouterSelectQuerySupported(Query *query)
List *queryList = NIL;
ListCell *queryCell = NULL;
StringInfo errorDetail = NULL;
bool hasUnsupportedDistinctOn = false;
ExtractQueryWalker((Node *) query, &queryList);
foreach(queryCell, queryList)
@ -865,14 +868,14 @@ MultiTaskRouterSelectQuerySupported(Query *query)
}
/*
* We cannot support DISTINCT ON clauses since it could be on a non-partition column.
* In that case, there is no way that Citus can support this.
* We don't support DISTINCT ON clauses on non-partition columns.
*/
if (subquery->hasDistinctOn)
hasUnsupportedDistinctOn = HasUnsupportedDistinctOn(subquery);
if (hasUnsupportedDistinctOn)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"DISTINCT ON clauses are not allowed in distributed "
"INSERT ... SELECT queries",
"DISTINCT ON (non-partition column) clauses are not "
"allowed in distributed INSERT ... SELECT queries",
NULL, NULL);
}
}
@ -881,6 +884,36 @@ MultiTaskRouterSelectQuerySupported(Query *query)
}
/*
* HasUnsupportedDistinctOn returns true if the query has distinct on and
* distinct targets do not contain partition column.
*/
static bool
HasUnsupportedDistinctOn(Query *query)
{
ListCell *distinctCell = NULL;
if (!query->hasDistinctOn)
{
return false;
}
foreach(distinctCell, query->distinctClause)
{
SortGroupClause *distinctClause = lfirst(distinctCell);
TargetEntry *distinctEntry = get_sortgroupclause_tle(distinctClause,
query->targetList);
if (IsPartitionColumn(distinctEntry->expr, query))
{
return false;
}
}
return true;
}
/*
* InsertPartitionColumnMatchesSelect returns NULL the partition column in the
* table targeted by INSERTed matches with the any of the SELECTed table's

View File

@ -1298,6 +1298,8 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode)
masterExtendedOpNode->targetList = newTargetEntryList;
masterExtendedOpNode->groupClauseList = originalOpNode->groupClauseList;
masterExtendedOpNode->sortClauseList = originalOpNode->sortClauseList;
masterExtendedOpNode->distinctClause = originalOpNode->distinctClause;
masterExtendedOpNode->hasDistinctOn = originalOpNode->hasDistinctOn;
masterExtendedOpNode->limitCount = originalOpNode->limitCount;
masterExtendedOpNode->limitOffset = originalOpNode->limitOffset;
masterExtendedOpNode->havingQual = newHavingQual;
@ -1787,6 +1789,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
WorkerAggregateWalkerContext *walkerContext =
palloc0(sizeof(WorkerAggregateWalkerContext));
Index nextSortGroupRefIndex = 0;
bool queryHasAggregates = false;
walkerContext->repartitionSubquery = false;
walkerContext->expressionList = NIL;
@ -1826,6 +1829,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
WorkerAggregateWalker((Node *) originalExpression, walkerContext);
newExpressionList = walkerContext->expressionList;
queryHasAggregates = true;
}
else
{
@ -1924,6 +1928,15 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
workerExtendedOpNode = CitusMakeNode(MultiExtendedOp);
workerExtendedOpNode->targetList = newTargetEntryList;
workerExtendedOpNode->distinctClause = NIL;
workerExtendedOpNode->hasDistinctOn = false;
if (!queryHasAggregates)
{
workerExtendedOpNode->distinctClause = originalOpNode->distinctClause;
workerExtendedOpNode->hasDistinctOn = originalOpNode->hasDistinctOn;
}
workerExtendedOpNode->groupClauseList = groupClauseList;
/* if we can push down the limit, also set related fields */
@ -3356,10 +3369,10 @@ WorkerLimitCount(MultiExtendedOp *originalOpNode,
/*
* WorkerSortClauseList first checks if the given extended node contains a limit
* that can be pushed down. If it does, the function then checks if we need to
* add any sorting and grouping clauses to the sort list we push down for the
* limit. If we do, the function adds these clauses and returns them. Otherwise,
* the function returns null.
* or hasDistinctOn that can be pushed down. If it does, the function then
* checks if we need to add any sorting and grouping clauses to the sort list we
* push down for the limit. If we do, the function adds these clauses and
* returns them. Otherwise, the function returns null.
*/
static List *
WorkerSortClauseList(MultiExtendedOp *originalOpNode,
@ -3370,8 +3383,8 @@ WorkerSortClauseList(MultiExtendedOp *originalOpNode,
List *sortClauseList = originalOpNode->sortClauseList;
List *targetList = originalOpNode->targetList;
/* if no limit node, no need to push down sort clauses */
if (originalOpNode->limitCount == NULL)
/* if no limit node and no hasDistinctOn, no need to push down sort clauses */
if (originalOpNode->limitCount == NULL && !originalOpNode->hasDistinctOn)
{
return NIL;
}

View File

@ -1805,13 +1805,6 @@ ErrorIfQueryNotSupported(Query *queryTree)
errorHint = filterHint;
}
if (queryTree->distinctClause)
{
preconditionsSatisfied = false;
errorMessage = "could not run distributed query with DISTINCT clause";
errorHint = filterHint;
}
if (queryTree->groupingSets)
{
preconditionsSatisfied = false;
@ -2850,6 +2843,8 @@ MultiExtendedOpNode(Query *queryTree)
extendedOpNode->limitCount = queryTree->limitCount;
extendedOpNode->limitOffset = queryTree->limitOffset;
extendedOpNode->havingQual = queryTree->havingQual;
extendedOpNode->distinctClause = queryTree->distinctClause;
extendedOpNode->hasDistinctOn = queryTree->hasDistinctOn;
return extendedOpNode;
}

View File

@ -151,6 +151,73 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan)
}
/*
* BuildDistinctPlan creates an returns a plan for distinct. Depending on
* availability of hash function it chooses HashAgg over Sort/Unique
* plans.
* This function has a potential performance issue since we blindly set
* Plan nodes without looking at cost. We might need to revisit this
* if we have performance issues with select distinct queries.
*/
static Plan *
BuildDistinctPlan(Query *masterQuery, Plan *subPlan)
{
Plan *distinctPlan = NULL;
bool distinctClausesHashable = true;
List *distinctClauseList = masterQuery->distinctClause;
List *targetList = copyObject(masterQuery->targetList);
List *columnList = pull_var_clause_default((Node *) targetList);
ListCell *columnCell = NULL;
if (IsA(subPlan, Agg))
{
return subPlan;
}
Assert(masterQuery->distinctClause);
Assert(!masterQuery->hasDistinctOn);
/*
* For upper level plans above the sequential scan, the planner expects the
* table id (varno) to be set to OUTER_VAR.
*/
foreach(columnCell, columnList)
{
Var *column = (Var *) lfirst(columnCell);
column->varno = OUTER_VAR;
}
/*
* Create group by plan with HashAggregate if all distinct
* members are hashable, Otherwise create sort+unique plan.
*/
distinctClausesHashable = grouping_is_hashable(distinctClauseList);
if (distinctClausesHashable)
{
const long rowEstimate = 10; /* using the same value as BuildAggregatePlan() */
AttrNumber *distinctColumnIdArray = extract_grouping_cols(distinctClauseList,
subPlan->targetlist);
Oid *distinctColumnOpArray = extract_grouping_ops(distinctClauseList);
uint32 distinctClauseCount = list_length(distinctClauseList);
distinctPlan = (Plan *) make_agg(targetList, NIL, AGG_HASHED,
AGGSPLIT_SIMPLE, distinctClauseCount,
distinctColumnIdArray,
distinctColumnOpArray, NIL, NIL,
rowEstimate, subPlan);
}
else
{
Sort *sortPlan = make_sort_from_sortclauses(masterQuery->distinctClause,
subPlan);
distinctPlan = (Plan *) make_unique_from_sortclauses((Plan *) sortPlan,
masterQuery->distinctClause);
}
return distinctPlan;
}
/*
* BuildSelectStatement builds the final select statement to run on the master
* node, before returning results to the user. The function first gets the custom
@ -166,6 +233,7 @@ BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *rem
Plan *topLevelPlan = NULL;
ListCell *targetEntryCell = NULL;
List *columnNameList = NULL;
List *sortClauseList = copyObject(masterQuery->sortClause);
/* (1) make PlannedStmt and set basic information */
selectStatement = makeNode(PlannedStmt);
@ -203,10 +271,47 @@ BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *rem
topLevelPlan = &remoteScan->scan.plan;
}
/* (3) add a sorting plan if needed */
if (masterQuery->sortClause)
/*
* (3) create distinct plan if needed.
*
* distinct on() requires sort + unique plans. Unique itself is not enough
* as it only compares the current value with previous one when checking
* uniqueness, thus ordering is necessary. If already has order by
* clause we append distinct clauses to the end of it. Postgresql requires
* that if both distinct on() and order by exists, ordering shall start
* on distinct clauses. Therefore we can safely append distinct clauses to
* the end of order by clauses. Although the same column may appear more
* than once in order by clauses, created plan uses only one instance, for
* example order by a,b,a,a,b,c is translated to equivalent order by a,b,c.
*
* If the query has distinct clause but not distinct on, we first create
* distinct plan that is either HashAggreate or Sort + Unique plans depending
* on hashable property of columns in distinct clause. If there is order by
* clause, it is handled after distinct planning.
*/
if (masterQuery->hasDistinctOn)
{
ListCell *distinctCell = NULL;
foreach(distinctCell, masterQuery->distinctClause)
{
SortGroupClause *singleDistinctClause = lfirst(distinctCell);
Index sortGroupRef = singleDistinctClause->tleSortGroupRef;
if (get_sortgroupref_clause_noerr(sortGroupRef, sortClauseList) == NULL)
{
sortClauseList = lappend(sortClauseList, singleDistinctClause);
}
}
}
else if (masterQuery->distinctClause)
{
Plan *distinctPlan = BuildDistinctPlan(masterQuery, topLevelPlan);
topLevelPlan = distinctPlan;
}
/* (4) add a sorting plan if needed */
if (sortClauseList)
{
List *sortClauseList = masterQuery->sortClause;
Sort *sortPlan = make_sort_from_sortclauses(sortClauseList, topLevelPlan);
/* just for reproducible costs between different PostgreSQL versions */
@ -217,7 +322,20 @@ BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *rem
topLevelPlan = (Plan *) sortPlan;
}
/* (4) add a limit plan if needed */
/*
* (5) add a unique plan for distinctOn.
* If the query has distinct on we add a sort clause in step 3. Therefore
* Step 4 always creates a sort plan.
* */
if (masterQuery->hasDistinctOn)
{
Assert(IsA(topLevelPlan, Sort));
topLevelPlan =
(Plan *) make_unique_from_sortclauses(topLevelPlan,
masterQuery->distinctClause);
}
/* (5) add a limit plan if needed */
if (masterQuery->limitCount || masterQuery->limitOffset)
{
Node *limitCount = masterQuery->limitCount;
@ -226,7 +344,7 @@ BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *rem
topLevelPlan = (Plan *) limitPlan;
}
/* (5) finally set our top level plan in the plan tree */
/* (6) finally set our top level plan in the plan tree */
selectStatement->planTree = topLevelPlan;
return selectStatement;

View File

@ -578,6 +578,8 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
FromExpr *joinTree = NULL;
Node *joinRoot = NULL;
Node *havingQual = NULL;
bool hasDistinctOn = false;
List *distinctClause = NIL;
/* we start building jobs from below the collect node */
Assert(!CitusIsA(multiNode, MultiCollect));
@ -625,6 +627,8 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
{
MultiExtendedOp *extendedOp = (MultiExtendedOp *) linitial(extendedOpNodeList);
targetList = copyObject(extendedOp->targetList);
distinctClause = extendedOp->distinctClause;
hasDistinctOn = extendedOp->hasDistinctOn;
}
else
{
@ -699,6 +703,8 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
jobQuery->limitCount = limitCount;
jobQuery->havingQual = havingQual;
jobQuery->hasAggs = contain_agg_clause((Node *) targetList);
jobQuery->distinctClause = distinctClause;
jobQuery->hasDistinctOn = hasDistinctOn;
return jobQuery;
}
@ -1405,6 +1411,9 @@ BuildSubqueryJobQuery(MultiNode *multiNode)
Node *limitCount = NULL;
Node *limitOffset = NULL;
FromExpr *joinTree = NULL;
bool hasAggregates = false;
List *distinctClause = NIL;
bool hasDistinctOn = false;
/* we start building jobs from below the collect node */
Assert(!CitusIsA(multiNode, MultiCollect));
@ -1450,6 +1459,8 @@ BuildSubqueryJobQuery(MultiNode *multiNode)
limitOffset = extendedOp->limitOffset;
sortClauseList = extendedOp->sortClauseList;
havingQual = extendedOp->havingQual;
distinctClause = extendedOp->distinctClause;
hasDistinctOn = extendedOp->hasDistinctOn;
}
/* build group clauses */
@ -1458,6 +1469,19 @@ BuildSubqueryJobQuery(MultiNode *multiNode)
/* build the where clause list using select predicates */
whereClauseList = QuerySelectClauseList(multiNode);
if (contain_agg_clause((Node *) targetList) ||
contain_agg_clause((Node *) havingQual))
{
hasAggregates = true;
}
/* distinct is not send to worker query if there are top level aggregates */
if (hasAggregates)
{
hasDistinctOn = false;
distinctClause = NIL;
}
/*
* Build the From/Where construct. We keep the where-clause list implicitly
* AND'd, since both partition and join pruning depends on the clauses being
@ -1480,8 +1504,9 @@ BuildSubqueryJobQuery(MultiNode *multiNode)
jobQuery->limitOffset = limitOffset;
jobQuery->limitCount = limitCount;
jobQuery->havingQual = havingQual;
jobQuery->hasAggs = contain_agg_clause((Node *) targetList) ||
contain_agg_clause((Node *) havingQual);
jobQuery->hasAggs = hasAggregates;
jobQuery->hasDistinctOn = hasDistinctOn;
jobQuery->distinctClause = distinctClause;
return jobQuery;
}

View File

@ -0,0 +1,71 @@
/*-------------------------------------------------------------------------
*
* postgres_planning_function.c
* Includes planning routines copied from
* src/backend/optimizer/plan/createplan.c
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* This needs to be closely in sync with the core code.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/multi_master_planner.h"
#include "nodes/plannodes.h"
#include "optimizer/tlist.h"
/*
* make_unique_from_sortclauses creates and returns a unique node
* from provided distinct clause list.
* The functions is copied from postgresql from
* src/backend/optimizer/plan/createplan.c.
*
* distinctList is a list of SortGroupClauses, identifying the targetlist items
* that should be considered by the Unique filter. The input path must
* already be sorted accordingly.
*/
Unique *
make_unique_from_sortclauses(Plan *lefttree, List *distinctList)
{
Unique *node = makeNode(Unique);
Plan *plan = &node->plan;
int numCols = list_length(distinctList);
int keyno = 0;
AttrNumber *uniqColIdx;
Oid *uniqOperators;
ListCell *slitem;
plan->targetlist = lefttree->targetlist;
plan->qual = NIL;
plan->lefttree = lefttree;
plan->righttree = NULL;
/*
* convert SortGroupClause list into arrays of attr indexes and equality
* operators, as wanted by executor
*/
Assert(numCols > 0);
uniqColIdx = (AttrNumber *) palloc(sizeof(AttrNumber) * numCols);
uniqOperators = (Oid *) palloc(sizeof(Oid) * numCols);
foreach(slitem, distinctList)
{
SortGroupClause *sortcl = (SortGroupClause *) lfirst(slitem);
TargetEntry *tle = get_sortgroupclause_tle(sortcl, plan->targetlist);
uniqColIdx[keyno] = tle->resno;
uniqOperators[keyno] = sortcl->eqop;
Assert(OidIsValid(uniqOperators[keyno]));
keyno++;
}
node->numCols = numCols;
node->uniqColIdx = uniqColIdx;
node->uniqOperators = uniqOperators;
return node;
}

View File

@ -292,6 +292,8 @@ OutMultiExtendedOp(OUTFUNC_ARGS)
WRITE_NODE_FIELD(limitCount);
WRITE_NODE_FIELD(limitOffset);
WRITE_NODE_FIELD(havingQual);
WRITE_BOOL_FIELD(hasDistinctOn);
WRITE_NODE_FIELD(distinctClause);
OutMultiUnaryNodeFields(str, (const MultiUnaryNode *) node);
}

View File

@ -173,6 +173,8 @@ typedef struct MultiExtendedOp
Node *limitCount;
Node *limitOffset;
Node *havingQual;
List *distinctClause;
bool hasDistinctOn;
} MultiExtendedOp;

View File

@ -24,6 +24,7 @@ struct MultiPlan;
struct CustomScan;
extern PlannedStmt * MasterNodeSelectPlan(struct MultiPlan *multiPlan,
struct CustomScan *dataScan);
extern Unique * make_unique_from_sortclauses(Plan *lefttree, List *distinctList);
#endif /* MULTI_MASTER_PLANNER_H */

View File

@ -454,4 +454,74 @@ SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
14371 | 101 | 50.5232064574490293
(1 row)
-- DISTINCT in the outer query and DISTINCT in the subquery
TRUNCATE agg_results;
INSERT INTO agg_results(user_id)
SELECT
DISTINCT users_ids.user_id
FROM
(SELECT DISTINCT user_id FROM users_table) as users_ids
JOIN
(SELECT
ma.user_id, ma.value_1, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 3
) temp
ON users_ids.user_id = temp.user_id
WHERE temp.value_1 < 50;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
count | count | avg
-------+-------+---------------------
27 | 27 | 54.0000000000000000
(1 row)
-- DISTINCT ON in the outer query and DISTINCT in the subquery
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg, value_2_agg)
SELECT
DISTINCT ON (users_ids.user_id) users_ids.user_id, temp.value_1, prob
FROM
(SELECT DISTINCT user_id FROM users_table) as users_ids
JOIN
(SELECT
ma.user_id, ma.value_1, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 15
) temp
ON users_ids.user_id = temp.user_id
WHERE temp.value_1 < 50
ORDER BY 1, 2;
SELECT count(*), count(DISTINCT user_id), avg(user_id), avg(value_1_agg) FROM agg_results;
count | count | avg | avg
-------+-------+---------------------+---------------------
80 | 80 | 50.7875000000000000 | 10.0125000000000000
(1 row)
-- DISTINCT ON in the outer query and DISTINCT ON in the subquery
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg, value_2_agg)
SELECT
DISTINCT ON (users_ids.user_id) users_ids.user_id, temp.value_1, prob
FROM
(SELECT DISTINCT ON (user_id) user_id, value_2 FROM users_table ORDER BY 1,2) as users_ids
JOIN
(SELECT
ma.user_id, ma.value_1, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 5000 and short_list.event_type < 3
) temp
ON users_ids.user_id = temp.user_id
ORDER BY 1, 2;
SELECT count(*), count(DISTINCT user_id), avg(user_id), avg(value_1_agg) FROM agg_results;
count | count | avg | avg
-------+-------+---------------------+--------------------
27 | 27 | 54.0000000000000000 | 9.8518518518518519
(1 row)

View File

@ -626,16 +626,60 @@ DEBUG: Plan is router executable
SET client_min_messages TO INFO;
truncate agg_events;
SET client_min_messages TO DEBUG2;
-- we do not support DISTINCT ON clauses
-- DISTINCT ON clauses are supported
-- distinct on(non-partition column)
-- values are pulled to master
INSERT INTO agg_events (value_1_agg, user_id)
SELECT
DISTINCT ON (value_1) value_1, user_id
FROM
raw_events_first;
DEBUG: DISTINCT ON clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: DISTINCT ON (non-partition column) clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: could not run distributed query with DISTINCT clause
HINT: Consider using an equality filter on the distributed table's partition column.
SELECT user_id, value_1_agg FROM agg_events ORDER BY 1,2;
user_id | value_1_agg
---------+-------------
1 | 10
2 | 20
3 | 30
4 | 40
5 | 50
6 | 60
7 |
8 | 80
9 | 90
(9 rows)
-- we don't want to see constraint vialotions, so truncate first
SET client_min_messages TO INFO;
truncate agg_events;
SET client_min_messages TO DEBUG2;
-- distinct on(partition column)
-- queries are forwared to workers
INSERT INTO agg_events (value_1_agg, user_id)
SELECT
DISTINCT ON (user_id) value_1, user_id
FROM
raw_events_first;
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id, value_1_agg) SELECT DISTINCT ON (user_id) user_id, value_1 FROM public.raw_events_first_13300000 raw_events_first WHERE ((worker_hash(user_id) >= '-2147483648'::integer) AND (worker_hash(user_id) <= '-1073741825'::integer))
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id, value_1_agg) SELECT DISTINCT ON (user_id) user_id, value_1 FROM public.raw_events_first_13300001 raw_events_first WHERE ((worker_hash(user_id) >= '-1073741824'::integer) AND (worker_hash(user_id) <= '-1'::integer))
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id, value_1_agg) SELECT DISTINCT ON (user_id) user_id, value_1 FROM public.raw_events_first_13300002 raw_events_first WHERE ((worker_hash(user_id) >= 0) AND (worker_hash(user_id) <= 1073741823))
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id, value_1_agg) SELECT DISTINCT ON (user_id) user_id, value_1 FROM public.raw_events_first_13300003 raw_events_first WHERE ((worker_hash(user_id) >= 1073741824) AND (worker_hash(user_id) <= 2147483647))
DEBUG: Plan is router executable
SELECT user_id, value_1_agg FROM agg_events ORDER BY 1,2;
user_id | value_1_agg
---------+-------------
1 | 10
2 | 20
3 | 30
4 | 40
5 | 50
6 | 60
7 |
8 | 80
9 | 90
(9 rows)
-- We do not support some CTEs
WITH fist_table_agg AS
(SELECT sum(value_1) as v1_agg, user_id FROM raw_events_first GROUP BY user_id)

View File

@ -0,0 +1,824 @@
--
-- MULTI_SELECT_DISTINCT
--
-- Tests select distinct, and select distinct on features.
--
-- function calls are supported
SELECT DISTINCT l_orderkey, now() FROM lineitem_hash_part LIMIT 0;
l_orderkey | now
------------+-----
(0 rows)
SELECT DISTINCT l_partkey, 1 + (random() * 0)::int FROM lineitem_hash_part ORDER BY 1 DESC LIMIT 3;
l_partkey | ?column?
-----------+----------
199973 | 1
199946 | 1
199943 | 1
(3 rows)
-- const expressions are supported
SELECT DISTINCT l_orderkey, 1+1 FROM lineitem_hash_part ORDER BY 1 LIMIT 5;
l_orderkey | ?column?
------------+----------
1 | 2
2 | 2
3 | 2
4 | 2
5 | 2
(5 rows)
-- non const expressions are also supported
SELECT DISTINCT l_orderkey, l_partkey + 1 FROM lineitem_hash_part ORDER BY 1, 2 LIMIT 5;
l_orderkey | ?column?
------------+----------
1 | 2133
1 | 15636
1 | 24028
1 | 63701
1 | 67311
(5 rows)
-- column expressions are supported
SELECT DISTINCT l_orderkey, l_shipinstruct || l_shipmode FROM lineitem_hash_part ORDER BY 2 , 1 LIMIT 5;
l_orderkey | ?column?
------------+----------------
32 | COLLECT CODAIR
39 | COLLECT CODAIR
66 | COLLECT CODAIR
70 | COLLECT CODAIR
98 | COLLECT CODAIR
(5 rows)
-- function calls with const input are supported
SELECT DISTINCT l_orderkey, strpos('AIR', 'A') FROM lineitem_hash_part ORDER BY 1,2 LIMIT 5;
l_orderkey | strpos
------------+--------
1 | 1
2 | 1
3 | 1
4 | 1
5 | 1
(5 rows)
-- function calls with non-const input are supported
SELECT DISTINCT l_orderkey, strpos(l_shipmode, 'I')
FROM lineitem_hash_part
WHERE strpos(l_shipmode, 'I') > 1
ORDER BY 2, 1
LIMIT 5;
l_orderkey | strpos
------------+--------
1 | 2
3 | 2
5 | 2
32 | 2
33 | 2
(5 rows)
-- distinct on partition column
-- verify counts match with respect to count(distinct)
CREATE TEMP TABLE temp_orderkeys AS SELECT DISTINCT l_orderkey FROM lineitem_hash_part;
SELECT COUNT(*) FROM temp_orderkeys;
count
-------
2985
(1 row)
SELECT COUNT(DISTINCT l_orderkey) FROM lineitem_hash_part;
count
-------
2985
(1 row)
SELECT DISTINCT l_orderkey FROM lineitem_hash_part WHERE l_orderkey < 500 and l_partkey < 5000 order by 1;
l_orderkey
------------
1
3
32
35
39
65
129
130
134
164
194
228
261
290
320
321
354
418
(18 rows)
-- distinct on non-partition column
SELECT DISTINCT l_partkey FROM lineitem_hash_part WHERE l_orderkey > 5 and l_orderkey < 20 order by 1;
l_partkey
-----------
79251
94780
139636
145243
151894
157238
163073
182052
(8 rows)
SELECT DISTINCT l_shipmode FROM lineitem_hash_part ORDER BY 1 DESC;
l_shipmode
------------
TRUCK
SHIP
REG AIR
RAIL
MAIL
FOB
AIR
(7 rows)
-- distinct with multiple columns
SELECT DISTINCT l_orderkey, o_orderdate
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE l_orderkey < 10
ORDER BY l_orderkey;
l_orderkey | o_orderdate
------------+-------------
1 | 01-02-1996
2 | 12-01-1996
3 | 10-14-1993
4 | 10-11-1995
5 | 07-30-1994
6 | 02-21-1992
7 | 01-10-1996
(7 rows)
-- distinct on partition column with aggregate
-- this is the same as the one without distinct due to group by
SELECT DISTINCT l_orderkey, count(*)
FROM lineitem_hash_part
WHERE l_orderkey < 200
GROUP BY 1
HAVING count(*) > 5
ORDER BY 2 DESC, 1;
l_orderkey | count
------------+-------
7 | 7
68 | 7
129 | 7
164 | 7
194 | 7
1 | 6
3 | 6
32 | 6
35 | 6
39 | 6
67 | 6
69 | 6
70 | 6
71 | 6
134 | 6
135 | 6
163 | 6
192 | 6
197 | 6
(19 rows)
-- explain the query to see actual plan
EXPLAIN (COSTS FALSE)
SELECT DISTINCT l_orderkey, count(*)
FROM lineitem_hash_part
WHERE l_orderkey < 200
GROUP BY 1
HAVING count(*) > 5
ORDER BY 2 DESC, 1;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC, remote_scan.l_orderkey
-> HashAggregate
Group Key: remote_scan.l_orderkey
Filter: (COALESCE((pg_catalog.sum(remote_scan.worker_column_3))::bigint, '0'::bigint) > 5)
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_orderkey
Filter: (count(*) > 5)
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
Filter: (l_orderkey < 200)
(15 rows)
-- distinct on non-partition column with aggregate
-- this is the same as non-distinct version due to group by
SELECT DISTINCT l_partkey, count(*)
FROM lineitem_hash_part
GROUP BY 1
HAVING count(*) > 2
ORDER BY 1;
l_partkey | count
-----------+-------
1051 | 3
1927 | 3
6983 | 3
15283 | 3
87761 | 3
136884 | 3
149926 | 3
160895 | 3
177771 | 3
188804 | 3
199146 | 3
(11 rows)
-- explain the query to see actual plan
EXPLAIN (COSTS FALSE)
SELECT DISTINCT l_partkey, count(*)
FROM lineitem_hash_part
GROUP BY 1
HAVING count(*) > 2
ORDER BY 1;
QUERY PLAN
----------------------------------------------------------------------------------------------------
Sort
Sort Key: remote_scan.l_partkey
-> HashAggregate
Group Key: remote_scan.l_partkey
Filter: (COALESCE((pg_catalog.sum(remote_scan.worker_column_3))::bigint, '0'::bigint) > 2)
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_partkey
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
(13 rows)
-- distinct on non-partition column and avg
SELECT DISTINCT l_partkey, avg(l_linenumber)
FROM lineitem_hash_part
WHERE l_partkey < 500
GROUP BY 1
HAVING avg(l_linenumber) > 2
ORDER BY 1;
l_partkey | avg
-----------+--------------------
18 | 7.0000000000000000
79 | 6.0000000000000000
149 | 4.5000000000000000
175 | 5.0000000000000000
179 | 6.0000000000000000
182 | 3.0000000000000000
222 | 4.0000000000000000
278 | 3.0000000000000000
299 | 7.0000000000000000
308 | 7.0000000000000000
309 | 5.0000000000000000
321 | 3.0000000000000000
337 | 6.0000000000000000
364 | 3.0000000000000000
403 | 4.0000000000000000
(15 rows)
-- distinct on multiple non-partition columns
SELECT DISTINCT l_partkey, l_suppkey
FROM lineitem_hash_part
WHERE l_shipmode = 'AIR' AND l_orderkey < 100
ORDER BY 1, 2;
l_partkey | l_suppkey
-----------+-----------
2132 | 4633
4297 | 1798
37531 | 35
44161 | 6666
44706 | 4707
67831 | 5350
85811 | 8320
94368 | 6878
108338 | 849
108570 | 8571
137267 | 4807
137469 | 9983
173489 | 3490
196156 | 1195
197921 | 441
(15 rows)
EXPLAIN (COSTS FALSE)
SELECT DISTINCT l_partkey, l_suppkey
FROM lineitem_hash_part
WHERE l_shipmode = 'AIR' AND l_orderkey < 100
ORDER BY 1, 2;
QUERY PLAN
-----------------------------------------------------------------------------------------------------
Sort
Sort Key: remote_scan.l_partkey, remote_scan.l_suppkey
-> HashAggregate
Group Key: remote_scan.l_partkey, remote_scan.l_suppkey
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Unique
-> Sort
Sort Key: l_partkey, l_suppkey
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
Filter: ((l_orderkey < 100) AND (l_shipmode = 'AIR'::bpchar))
(14 rows)
-- distinct on partition column
SELECT DISTINCT ON (l_orderkey) l_orderkey, l_partkey, l_suppkey
FROM lineitem_hash_part
WHERE l_orderkey < 35
ORDER BY 1;
l_orderkey | l_partkey | l_suppkey
------------+-----------+-----------
1 | 155190 | 7706
2 | 106170 | 1191
3 | 4297 | 1798
4 | 88035 | 5560
5 | 108570 | 8571
6 | 139636 | 2150
7 | 182052 | 9607
32 | 82704 | 7721
33 | 61336 | 8855
34 | 88362 | 871
(10 rows)
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (l_orderkey) l_orderkey, l_partkey, l_suppkey
FROM lineitem_hash_part
WHERE l_orderkey < 35
ORDER BY 1;
QUERY PLAN
----------------------------------------------------------------------------------------------
Unique
-> Sort
Sort Key: remote_scan.l_orderkey
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Unique
-> Sort
Sort Key: l_orderkey
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
Filter: (l_orderkey < 35)
(13 rows)
-- distinct on non-partition column
-- note order by is required here
-- otherwise query results will be different since
-- distinct on clause is on non-partition column
SELECT DISTINCT ON (l_partkey) l_partkey, l_orderkey
FROM lineitem_hash_part
ORDER BY 1,2
LIMIT 20;
l_partkey | l_orderkey
-----------+------------
18 | 12005
79 | 5121
91 | 2883
149 | 807
175 | 4102
179 | 2117
182 | 548
195 | 2528
204 | 10048
222 | 9413
245 | 9446
278 | 1287
299 | 1122
308 | 11137
309 | 2374
318 | 321
321 | 5984
337 | 10403
350 | 13698
358 | 4323
(20 rows)
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (l_partkey) l_partkey, l_orderkey
FROM lineitem_hash_part
ORDER BY 1,2
LIMIT 20;
QUERY PLAN
----------------------------------------------------------------------------------------------------------
Limit
-> Unique
-> Sort
Sort Key: remote_scan.l_partkey, remote_scan.l_orderkey
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Unique
-> Sort
Sort Key: l_partkey, l_orderkey
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
(14 rows)
-- distinct on with joins
-- each customer's first order key
SELECT DISTINCT ON (o_custkey) o_custkey, l_orderkey
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE o_custkey < 15
ORDER BY 1,2;
o_custkey | l_orderkey
-----------+------------
1 | 9154
2 | 10563
4 | 320
5 | 11682
7 | 10402
8 | 102
10 | 1602
11 | 12800
13 | 994
14 | 11011
(10 rows)
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (o_custkey) o_custkey, l_orderkey
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE o_custkey < 15
ORDER BY 1,2;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------
Unique
-> Sort
Sort Key: remote_scan.o_custkey, remote_scan.l_orderkey
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Unique
-> Sort
Sort Key: orders_hash_part.o_custkey, lineitem_hash_part.l_orderkey
-> Hash Join
Hash Cond: (lineitem_hash_part.l_orderkey = orders_hash_part.o_orderkey)
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
-> Hash
-> Seq Scan on orders_hash_part_360294 orders_hash_part
Filter: (o_custkey < 15)
(17 rows)
-- explain without order by
-- notice master plan has order by on distinct on column
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (o_custkey) o_custkey, l_orderkey
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE o_custkey < 15;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------
Unique
-> Sort
Sort Key: remote_scan.o_custkey
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Unique
-> Sort
Sort Key: orders_hash_part.o_custkey
-> Hash Join
Hash Cond: (lineitem_hash_part.l_orderkey = orders_hash_part.o_orderkey)
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
-> Hash
-> Seq Scan on orders_hash_part_360294 orders_hash_part
Filter: (o_custkey < 15)
(17 rows)
-- each customer's each order's first l_partkey
SELECT DISTINCT ON (o_custkey, l_orderkey) o_custkey, l_orderkey, l_linenumber, l_partkey
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE o_custkey < 20
ORDER BY 1,2,3;
o_custkey | l_orderkey | l_linenumber | l_partkey
-----------+------------+--------------+-----------
1 | 9154 | 1 | 86513
1 | 14656 | 1 | 59539
2 | 10563 | 1 | 147459
4 | 320 | 1 | 4415
4 | 739 | 1 | 84489
4 | 10688 | 1 | 45037
4 | 10788 | 1 | 50814
4 | 13728 | 1 | 86216
5 | 11682 | 1 | 31634
5 | 11746 | 1 | 180724
5 | 14308 | 1 | 157430
7 | 10402 | 1 | 53661
7 | 13031 | 1 | 112161
7 | 14145 | 1 | 138729
7 | 14404 | 1 | 143034
8 | 102 | 1 | 88914
8 | 164 | 1 | 91309
8 | 13601 | 1 | 40504
10 | 1602 | 1 | 182806
10 | 9862 | 1 | 86241
10 | 11431 | 1 | 62112
10 | 13124 | 1 | 29414
11 | 12800 | 1 | 152806
13 | 994 | 1 | 64486
13 | 1603 | 1 | 38191
13 | 4704 | 1 | 77934
13 | 9927 | 1 | 875
14 | 11011 | 1 | 172485
17 | 896 | 1 | 38675
17 | 5507 | 1 | 9600
19 | 353 | 1 | 119305
19 | 1504 | 1 | 81389
19 | 1669 | 1 | 78373
19 | 5893 | 1 | 133707
19 | 9954 | 1 | 92138
19 | 14885 | 1 | 36154
(36 rows)
-- explain without order by
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (o_custkey, l_orderkey) o_custkey, l_orderkey, l_linenumber, l_partkey
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE o_custkey < 20;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------
Unique
-> Sort
Sort Key: remote_scan.o_custkey, remote_scan.l_orderkey
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Unique
-> Sort
Sort Key: orders_hash_part.o_custkey, lineitem_hash_part.l_orderkey
-> Hash Join
Hash Cond: (lineitem_hash_part.l_orderkey = orders_hash_part.o_orderkey)
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
-> Hash
-> Seq Scan on orders_hash_part_360294 orders_hash_part
Filter: (o_custkey < 20)
(17 rows)
-- each customer's each order's last l_partkey
SELECT DISTINCT ON (o_custkey, l_orderkey) o_custkey, l_orderkey, l_linenumber, l_partkey
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE o_custkey < 15
ORDER BY 1,2,3 DESC;
o_custkey | l_orderkey | l_linenumber | l_partkey
-----------+------------+--------------+-----------
1 | 9154 | 7 | 173448
1 | 14656 | 1 | 59539
2 | 10563 | 4 | 110741
4 | 320 | 2 | 192158
4 | 739 | 5 | 187523
4 | 10688 | 2 | 132574
4 | 10788 | 4 | 196473
4 | 13728 | 3 | 12450
5 | 11682 | 3 | 177152
5 | 11746 | 7 | 193807
5 | 14308 | 3 | 140916
7 | 10402 | 2 | 64514
7 | 13031 | 6 | 7761
7 | 14145 | 6 | 130723
7 | 14404 | 7 | 35349
8 | 102 | 4 | 61158
8 | 164 | 7 | 3037
8 | 13601 | 5 | 12470
10 | 1602 | 1 | 182806
10 | 9862 | 5 | 135675
10 | 11431 | 7 | 8563
10 | 13124 | 3 | 67055
11 | 12800 | 5 | 179110
13 | 994 | 4 | 130471
13 | 1603 | 2 | 65209
13 | 4704 | 3 | 63081
13 | 9927 | 6 | 119356
14 | 11011 | 7 | 95939
(28 rows)
-- subqueries
SELECT DISTINCT l_orderkey, l_partkey
FROM (
SELECT l_orderkey, l_partkey
FROM lineitem_hash_part
) q
ORDER BY 1,2
LIMIT 10;
l_orderkey | l_partkey
------------+-----------
1 | 2132
1 | 15635
1 | 24027
1 | 63700
1 | 67310
1 | 155190
2 | 106170
3 | 4297
3 | 19036
3 | 29380
(10 rows)
EXPLAIN (COSTS FALSE)
SELECT DISTINCT l_orderkey, l_partkey
FROM (
SELECT l_orderkey, l_partkey
FROM lineitem_hash_part
) q
ORDER BY 1,2
LIMIT 10;
QUERY PLAN
----------------------------------------------------------------------------------------------------------
Limit
-> Sort
Sort Key: remote_scan.l_orderkey, remote_scan.l_partkey
-> HashAggregate
Group Key: remote_scan.l_orderkey, remote_scan.l_partkey
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Sort
Sort Key: l_orderkey, l_partkey
-> HashAggregate
Group Key: l_orderkey, l_partkey
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
(16 rows)
SELECT DISTINCT l_orderkey, cnt
FROM (
SELECT l_orderkey, count(*) as cnt
FROM lineitem_hash_part
GROUP BY 1
) q
ORDER BY 1,2
LIMIT 10;
l_orderkey | cnt
------------+-----
1 | 6
2 | 1
3 | 6
4 | 1
5 | 3
6 | 1
7 | 7
32 | 6
33 | 4
34 | 3
(10 rows)
EXPLAIN (COSTS FALSE)
SELECT DISTINCT l_orderkey, cnt
FROM (
SELECT l_orderkey, count(*) as cnt
FROM lineitem_hash_part
GROUP BY 1
) q
ORDER BY 1,2
LIMIT 10;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------
Limit
-> Sort
Sort Key: remote_scan.l_orderkey, remote_scan.cnt
-> HashAggregate
Group Key: remote_scan.l_orderkey, remote_scan.cnt
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Sort
Sort Key: lineitem_hash_part.l_orderkey, (count(*))
-> HashAggregate
Group Key: lineitem_hash_part.l_orderkey, count(*)
-> HashAggregate
Group Key: lineitem_hash_part.l_orderkey
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
(18 rows)
-- distinct on partition column
-- random() is added to inner query to prevent flattening
SELECT DISTINCT ON (l_orderkey) l_orderkey, l_partkey
FROM (
SELECT l_orderkey, l_partkey, (random()*10)::int + 2 as r
FROM lineitem_hash_part
) q
WHERE r > 1
ORDER BY 1,2
LIMIT 10;
l_orderkey | l_partkey
------------+-----------
1 | 2132
2 | 106170
3 | 4297
4 | 88035
5 | 37531
6 | 139636
7 | 79251
32 | 2743
33 | 33918
34 | 88362
(10 rows)
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (l_orderkey) l_orderkey, l_partkey
FROM (
SELECT l_orderkey, l_partkey, (random()*10)::int + 2 as r
FROM lineitem_hash_part
) q
WHERE r > 1
ORDER BY 1,2
LIMIT 10;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------
Limit
-> Unique
-> Sort
Sort Key: remote_scan.l_orderkey, remote_scan.l_partkey
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Unique
-> Sort
Sort Key: q.l_orderkey, q.l_partkey
-> Subquery Scan on q
Filter: (q.r > 1)
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
(16 rows)
-- distinct on non-partition column
SELECT DISTINCT ON (l_partkey) l_orderkey, l_partkey
FROM (
SELECT l_orderkey, l_partkey, (random()*10)::int + 2 as r
FROM lineitem_hash_part
) q
WHERE r > 1
ORDER BY 2,1
LIMIT 10;
l_orderkey | l_partkey
------------+-----------
12005 | 18
5121 | 79
2883 | 91
807 | 149
4102 | 175
2117 | 179
548 | 182
2528 | 195
10048 | 204
9413 | 222
(10 rows)
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (l_partkey) l_orderkey, l_partkey
FROM (
SELECT l_orderkey, l_partkey, (random()*10)::int + 2 as r
FROM lineitem_hash_part
) q
WHERE r > 1
ORDER BY 2,1
LIMIT 10;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------
Limit
-> Unique
-> Sort
Sort Key: remote_scan.l_partkey, remote_scan.l_orderkey
-> Custom Scan (Citus Real-Time)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Unique
-> Sort
Sort Key: q.l_partkey, q.l_orderkey
-> Subquery Scan on q
Filter: (q.r > 1)
-> Seq Scan on lineitem_hash_part_360290 lineitem_hash_part
(16 rows)

View File

@ -2071,6 +2071,83 @@ FROM
WHERE
users_table.value_1 < 50 AND test_join_function_2(users_table.user_id, temp.user_id);
ERROR: unsupported clause type
-- DISTINCT in the outer query and DISTINCT in the subquery
SELECT
DISTINCT users_ids.user_id
FROM
(SELECT DISTINCT user_id FROM users_table) as users_ids
JOIN
(SELECT
ma.user_id, ma.value_1, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 3
) temp
ON users_ids.user_id = temp.user_id
WHERE temp.value_1 < 50
ORDER BY 1
LIMIT 5;
user_id
---------
1
6
16
21
26
(5 rows)
-- DISTINCT ON in the outer query and DISTINCT in the subquery
SELECT
DISTINCT ON (users_ids.user_id) users_ids.user_id, temp.value_1, prob
FROM
(SELECT DISTINCT user_id FROM users_table) as users_ids
JOIN
(SELECT
ma.user_id, ma.value_1, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 15
) temp
ON users_ids.user_id = temp.user_id
WHERE temp.value_1 < 50
ORDER BY 1, 2
LIMIT 5;
user_id | value_1 | prob
---------+---------+------------------------
1 | 6 | 0.50000000000000000000
2 | 2 | 0.50000000000000000000
4 | 3 | 0.50000000000000000000
6 | 3 | 0.50000000000000000000
7 | 2 | 0.50000000000000000000
(5 rows)
-- DISTINCT ON in the outer query and DISTINCT ON in the subquery
SELECT
DISTINCT ON (users_ids.user_id) users_ids.user_id, temp.value_1, prob
FROM
(SELECT DISTINCT ON (user_id) user_id, value_1 FROM users_table ORDER BY 1,2) as users_ids
JOIN
(SELECT
ma.user_id, ma.value_1, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 25 and short_list.event_type < 3
) temp
ON users_ids.user_id = temp.user_id
ORDER BY 1,2
LIMIT 5;
user_id | value_1 | prob
---------+---------+------------------------
1 | 6 | 0.50000000000000000000
6 | 3 | 0.50000000000000000000
16 | 4 | 0.50000000000000000000
21 | 0 | 0.50000000000000000000
26 | 5 | 0.50000000000000000000
(5 rows)
DROP FUNCTION test_join_function_2(integer, integer);
SET citus.enable_router_execution TO TRUE;
SET citus.subquery_pushdown to OFF;

View File

@ -143,7 +143,7 @@ test: multi_outer_join
# is independed from the rest of the group, it is added to increase parallelism.
# ---
test: multi_create_fdw
test: multi_complex_count_distinct
test: multi_complex_count_distinct multi_select_distinct
test: multi_distribution_metadata
test: multi_generate_ddl_commands
test: multi_create_shards

View File

@ -417,4 +417,68 @@ FROM
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
-- DISTINCT in the outer query and DISTINCT in the subquery
TRUNCATE agg_results;
INSERT INTO agg_results(user_id)
SELECT
DISTINCT users_ids.user_id
FROM
(SELECT DISTINCT user_id FROM users_table) as users_ids
JOIN
(SELECT
ma.user_id, ma.value_1, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 3
) temp
ON users_ids.user_id = temp.user_id
WHERE temp.value_1 < 50;
-- get some statistics from the aggregated results to ensure the results are correct
SELECT count(*), count(DISTINCT user_id), avg(user_id) FROM agg_results;
-- DISTINCT ON in the outer query and DISTINCT in the subquery
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg, value_2_agg)
SELECT
DISTINCT ON (users_ids.user_id) users_ids.user_id, temp.value_1, prob
FROM
(SELECT DISTINCT user_id FROM users_table) as users_ids
JOIN
(SELECT
ma.user_id, ma.value_1, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 15
) temp
ON users_ids.user_id = temp.user_id
WHERE temp.value_1 < 50
ORDER BY 1, 2;
SELECT count(*), count(DISTINCT user_id), avg(user_id), avg(value_1_agg) FROM agg_results;
-- DISTINCT ON in the outer query and DISTINCT ON in the subquery
TRUNCATE agg_results;
INSERT INTO agg_results(user_id, value_1_agg, value_2_agg)
SELECT
DISTINCT ON (users_ids.user_id) users_ids.user_id, temp.value_1, prob
FROM
(SELECT DISTINCT ON (user_id) user_id, value_2 FROM users_table ORDER BY 1,2) as users_ids
JOIN
(SELECT
ma.user_id, ma.value_1, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 5000 and short_list.event_type < 3
) temp
ON users_ids.user_id = temp.user_id
ORDER BY 1, 2;
SELECT count(*), count(DISTINCT user_id), avg(user_id), avg(value_1_agg) FROM agg_results;

View File

@ -478,13 +478,32 @@ SET client_min_messages TO INFO;
truncate agg_events;
SET client_min_messages TO DEBUG2;
-- we do not support DISTINCT ON clauses
-- DISTINCT ON clauses are supported
-- distinct on(non-partition column)
-- values are pulled to master
INSERT INTO agg_events (value_1_agg, user_id)
SELECT
DISTINCT ON (value_1) value_1, user_id
FROM
raw_events_first;
SELECT user_id, value_1_agg FROM agg_events ORDER BY 1,2;
-- we don't want to see constraint vialotions, so truncate first
SET client_min_messages TO INFO;
truncate agg_events;
SET client_min_messages TO DEBUG2;
-- distinct on(partition column)
-- queries are forwared to workers
INSERT INTO agg_events (value_1_agg, user_id)
SELECT
DISTINCT ON (user_id) value_1, user_id
FROM
raw_events_first;
SELECT user_id, value_1_agg FROM agg_events ORDER BY 1,2;
-- We do not support some CTEs
WITH fist_table_agg AS
(SELECT sum(value_1) as v1_agg, user_id FROM raw_events_first GROUP BY user_id)

View File

@ -0,0 +1,245 @@
--
-- MULTI_SELECT_DISTINCT
--
-- Tests select distinct, and select distinct on features.
--
-- function calls are supported
SELECT DISTINCT l_orderkey, now() FROM lineitem_hash_part LIMIT 0;
SELECT DISTINCT l_partkey, 1 + (random() * 0)::int FROM lineitem_hash_part ORDER BY 1 DESC LIMIT 3;
-- const expressions are supported
SELECT DISTINCT l_orderkey, 1+1 FROM lineitem_hash_part ORDER BY 1 LIMIT 5;
-- non const expressions are also supported
SELECT DISTINCT l_orderkey, l_partkey + 1 FROM lineitem_hash_part ORDER BY 1, 2 LIMIT 5;
-- column expressions are supported
SELECT DISTINCT l_orderkey, l_shipinstruct || l_shipmode FROM lineitem_hash_part ORDER BY 2 , 1 LIMIT 5;
-- function calls with const input are supported
SELECT DISTINCT l_orderkey, strpos('AIR', 'A') FROM lineitem_hash_part ORDER BY 1,2 LIMIT 5;
-- function calls with non-const input are supported
SELECT DISTINCT l_orderkey, strpos(l_shipmode, 'I')
FROM lineitem_hash_part
WHERE strpos(l_shipmode, 'I') > 1
ORDER BY 2, 1
LIMIT 5;
-- distinct on partition column
-- verify counts match with respect to count(distinct)
CREATE TEMP TABLE temp_orderkeys AS SELECT DISTINCT l_orderkey FROM lineitem_hash_part;
SELECT COUNT(*) FROM temp_orderkeys;
SELECT COUNT(DISTINCT l_orderkey) FROM lineitem_hash_part;
SELECT DISTINCT l_orderkey FROM lineitem_hash_part WHERE l_orderkey < 500 and l_partkey < 5000 order by 1;
-- distinct on non-partition column
SELECT DISTINCT l_partkey FROM lineitem_hash_part WHERE l_orderkey > 5 and l_orderkey < 20 order by 1;
SELECT DISTINCT l_shipmode FROM lineitem_hash_part ORDER BY 1 DESC;
-- distinct with multiple columns
SELECT DISTINCT l_orderkey, o_orderdate
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE l_orderkey < 10
ORDER BY l_orderkey;
-- distinct on partition column with aggregate
-- this is the same as the one without distinct due to group by
SELECT DISTINCT l_orderkey, count(*)
FROM lineitem_hash_part
WHERE l_orderkey < 200
GROUP BY 1
HAVING count(*) > 5
ORDER BY 2 DESC, 1;
-- explain the query to see actual plan
EXPLAIN (COSTS FALSE)
SELECT DISTINCT l_orderkey, count(*)
FROM lineitem_hash_part
WHERE l_orderkey < 200
GROUP BY 1
HAVING count(*) > 5
ORDER BY 2 DESC, 1;
-- distinct on non-partition column with aggregate
-- this is the same as non-distinct version due to group by
SELECT DISTINCT l_partkey, count(*)
FROM lineitem_hash_part
GROUP BY 1
HAVING count(*) > 2
ORDER BY 1;
-- explain the query to see actual plan
EXPLAIN (COSTS FALSE)
SELECT DISTINCT l_partkey, count(*)
FROM lineitem_hash_part
GROUP BY 1
HAVING count(*) > 2
ORDER BY 1;
-- distinct on non-partition column and avg
SELECT DISTINCT l_partkey, avg(l_linenumber)
FROM lineitem_hash_part
WHERE l_partkey < 500
GROUP BY 1
HAVING avg(l_linenumber) > 2
ORDER BY 1;
-- distinct on multiple non-partition columns
SELECT DISTINCT l_partkey, l_suppkey
FROM lineitem_hash_part
WHERE l_shipmode = 'AIR' AND l_orderkey < 100
ORDER BY 1, 2;
EXPLAIN (COSTS FALSE)
SELECT DISTINCT l_partkey, l_suppkey
FROM lineitem_hash_part
WHERE l_shipmode = 'AIR' AND l_orderkey < 100
ORDER BY 1, 2;
-- distinct on partition column
SELECT DISTINCT ON (l_orderkey) l_orderkey, l_partkey, l_suppkey
FROM lineitem_hash_part
WHERE l_orderkey < 35
ORDER BY 1;
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (l_orderkey) l_orderkey, l_partkey, l_suppkey
FROM lineitem_hash_part
WHERE l_orderkey < 35
ORDER BY 1;
-- distinct on non-partition column
-- note order by is required here
-- otherwise query results will be different since
-- distinct on clause is on non-partition column
SELECT DISTINCT ON (l_partkey) l_partkey, l_orderkey
FROM lineitem_hash_part
ORDER BY 1,2
LIMIT 20;
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (l_partkey) l_partkey, l_orderkey
FROM lineitem_hash_part
ORDER BY 1,2
LIMIT 20;
-- distinct on with joins
-- each customer's first order key
SELECT DISTINCT ON (o_custkey) o_custkey, l_orderkey
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE o_custkey < 15
ORDER BY 1,2;
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (o_custkey) o_custkey, l_orderkey
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE o_custkey < 15
ORDER BY 1,2;
-- explain without order by
-- notice master plan has order by on distinct on column
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (o_custkey) o_custkey, l_orderkey
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE o_custkey < 15;
-- each customer's each order's first l_partkey
SELECT DISTINCT ON (o_custkey, l_orderkey) o_custkey, l_orderkey, l_linenumber, l_partkey
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE o_custkey < 20
ORDER BY 1,2,3;
-- explain without order by
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (o_custkey, l_orderkey) o_custkey, l_orderkey, l_linenumber, l_partkey
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE o_custkey < 20;
-- each customer's each order's last l_partkey
SELECT DISTINCT ON (o_custkey, l_orderkey) o_custkey, l_orderkey, l_linenumber, l_partkey
FROM lineitem_hash_part JOIN orders_hash_part ON (l_orderkey = o_orderkey)
WHERE o_custkey < 15
ORDER BY 1,2,3 DESC;
-- subqueries
SELECT DISTINCT l_orderkey, l_partkey
FROM (
SELECT l_orderkey, l_partkey
FROM lineitem_hash_part
) q
ORDER BY 1,2
LIMIT 10;
EXPLAIN (COSTS FALSE)
SELECT DISTINCT l_orderkey, l_partkey
FROM (
SELECT l_orderkey, l_partkey
FROM lineitem_hash_part
) q
ORDER BY 1,2
LIMIT 10;
SELECT DISTINCT l_orderkey, cnt
FROM (
SELECT l_orderkey, count(*) as cnt
FROM lineitem_hash_part
GROUP BY 1
) q
ORDER BY 1,2
LIMIT 10;
EXPLAIN (COSTS FALSE)
SELECT DISTINCT l_orderkey, cnt
FROM (
SELECT l_orderkey, count(*) as cnt
FROM lineitem_hash_part
GROUP BY 1
) q
ORDER BY 1,2
LIMIT 10;
-- distinct on partition column
-- random() is added to inner query to prevent flattening
SELECT DISTINCT ON (l_orderkey) l_orderkey, l_partkey
FROM (
SELECT l_orderkey, l_partkey, (random()*10)::int + 2 as r
FROM lineitem_hash_part
) q
WHERE r > 1
ORDER BY 1,2
LIMIT 10;
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (l_orderkey) l_orderkey, l_partkey
FROM (
SELECT l_orderkey, l_partkey, (random()*10)::int + 2 as r
FROM lineitem_hash_part
) q
WHERE r > 1
ORDER BY 1,2
LIMIT 10;
-- distinct on non-partition column
SELECT DISTINCT ON (l_partkey) l_orderkey, l_partkey
FROM (
SELECT l_orderkey, l_partkey, (random()*10)::int + 2 as r
FROM lineitem_hash_part
) q
WHERE r > 1
ORDER BY 2,1
LIMIT 10;
EXPLAIN (COSTS FALSE)
SELECT DISTINCT ON (l_partkey) l_orderkey, l_partkey
FROM (
SELECT l_orderkey, l_partkey, (random()*10)::int + 2 as r
FROM lineitem_hash_part
) q
WHERE r > 1
ORDER BY 2,1
LIMIT 10;

View File

@ -1659,8 +1659,60 @@ FROM
WHERE
users_table.value_1 < 50 AND test_join_function_2(users_table.user_id, temp.user_id);
-- DISTINCT in the outer query and DISTINCT in the subquery
SELECT
DISTINCT users_ids.user_id
FROM
(SELECT DISTINCT user_id FROM users_table) as users_ids
JOIN
(SELECT
ma.user_id, ma.value_1, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 3
) temp
ON users_ids.user_id = temp.user_id
WHERE temp.value_1 < 50
ORDER BY 1
LIMIT 5;
-- DISTINCT ON in the outer query and DISTINCT in the subquery
SELECT
DISTINCT ON (users_ids.user_id) users_ids.user_id, temp.value_1, prob
FROM
(SELECT DISTINCT user_id FROM users_table) as users_ids
JOIN
(SELECT
ma.user_id, ma.value_1, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 50 and short_list.event_type < 15
) temp
ON users_ids.user_id = temp.user_id
WHERE temp.value_1 < 50
ORDER BY 1, 2
LIMIT 5;
-- DISTINCT ON in the outer query and DISTINCT ON in the subquery
SELECT
DISTINCT ON (users_ids.user_id) users_ids.user_id, temp.value_1, prob
FROM
(SELECT DISTINCT ON (user_id) user_id, value_1 FROM users_table ORDER BY 1,2) as users_ids
JOIN
(SELECT
ma.user_id, ma.value_1, (GREATEST(coalesce(ma.value_4 / 250, 0.0) + GREATEST(1.0))) / 2 AS prob
FROM
users_table AS ma, events_table as short_list
WHERE
short_list.user_id = ma.user_id and ma.value_1 < 25 and short_list.event_type < 3
) temp
ON users_ids.user_id = temp.user_id
ORDER BY 1,2
LIMIT 5;
DROP FUNCTION test_join_function_2(integer, integer);
SET citus.enable_router_execution TO TRUE;
SET citus.subquery_pushdown to OFF;