From b21b6905ae80d0d2b4f5d6b9a0661e4cf9a58e4d Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sat, 14 Dec 2019 10:04:14 +0100 Subject: [PATCH] Do not repeat GROUP BY distribution_column on coordinator Allow arbitrary aggregates to be pushed down in these scenarios --- .../planner/extended_op_node_utils.c | 83 +++- .../planner/multi_logical_optimizer.c | 86 ++-- .../regress/expected/aggregate_support.out | 24 +- .../expected/custom_aggregate_support.out | 457 ++++++++---------- .../expected/multi_having_pushdown.out | 137 +++--- .../expected/multi_orderby_limit_pushdown.out | 212 ++++---- .../multi_repartition_join_planning.out | 16 +- .../expected/multi_select_distinct.out | 172 +++---- src/test/regress/expected/multi_view.out | 48 +- .../expected/subqueries_not_supported.out | 4 +- .../multi_complex_count_distinct.source | 139 +++--- src/test/regress/sql/aggregate_support.sql | 8 + .../regress/sql/subqueries_not_supported.sql | 4 +- 13 files changed, 702 insertions(+), 688 deletions(-) diff --git a/src/backend/distributed/planner/extended_op_node_utils.c b/src/backend/distributed/planner/extended_op_node_utils.c index d6db5ef72..0b0fea1f6 100644 --- a/src/backend/distributed/planner/extended_op_node_utils.c +++ b/src/backend/distributed/planner/extended_op_node_utils.c @@ -11,6 +11,7 @@ #include "postgres.h" #include "distributed/extended_op_node_utils.h" +#include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/pg_dist_partition.h" @@ -24,8 +25,7 @@ #include "nodes/pg_list.h" -static bool GroupedByDisjointPartitionColumn(List *tableNodeList, - MultiExtendedOp *opNode); +static bool GroupedByPartitionColumn(MultiNode *node, MultiExtendedOp *opNode); static bool ExtendedOpNodeContainsRepartitionSubquery(MultiExtendedOp *originalOpNode); static bool HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual, @@ -46,11 +46,9 @@ BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode) { ExtendedOpNodeProperties extendedOpNodeProperties; - List *tableNodeList = FindNodesOfType((MultiNode *) extendedOpNode, T_MultiTable); - bool groupedByDisjointPartitionColumn = GroupedByDisjointPartitionColumn( - tableNodeList, - extendedOpNode); + bool groupedByDisjointPartitionColumn = + GroupedByPartitionColumn((MultiNode *) extendedOpNode, extendedOpNode); bool repartitionSubquery = ExtendedOpNodeContainsRepartitionSubquery(extendedOpNode); @@ -83,41 +81,86 @@ BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode) /* - * GroupedByDisjointPartitionColumn returns true if the query is grouped by the - * partition column of a table whose shards have disjoint sets of partition values. + * GroupedByPartitionColumn returns true if a GROUP BY in the opNode contains + * the partition column of the underlying relation, which is determined by + * searching the MultiNode tree for a MultiTable and MultiPartition with + * a matching column. + * + * When there is a re-partition join, the search terminates at the + * MultiPartition node. Hence we can push down the GROUP BY if the join + * column is in the GROUP BY. */ static bool -GroupedByDisjointPartitionColumn(List *tableNodeList, MultiExtendedOp *opNode) +GroupedByPartitionColumn(MultiNode *node, MultiExtendedOp *opNode) { - bool result = false; - ListCell *tableNodeCell = NULL; - - foreach(tableNodeCell, tableNodeList) + if (node == NULL) { - MultiTable *tableNode = (MultiTable *) lfirst(tableNodeCell); + return false; + } + + if (CitusIsA(node, MultiTable)) + { + MultiTable *tableNode = (MultiTable *) node; + Oid relationId = tableNode->relationId; - if (relationId == SUBQUERY_RELATION_ID || !IsDistributedTable(relationId)) + if (relationId == SUBQUERY_RELATION_ID || + relationId == SUBQUERY_PUSHDOWN_RELATION_ID) { - continue; + /* ignore subqueries for now */ + return false; } char partitionMethod = PartitionMethod(relationId); if (partitionMethod != DISTRIBUTE_BY_RANGE && partitionMethod != DISTRIBUTE_BY_HASH) { - continue; + /* only range- and hash-distributed tables are strictly partitioned */ + return false; } if (GroupedByColumn(opNode->groupClauseList, opNode->targetList, tableNode->partitionColumn)) { - result = true; - break; + /* this node is partitioned by a column in the GROUP BY */ + return true; + } + } + else if (CitusIsA(node, MultiPartition)) + { + MultiPartition *partitionNode = (MultiPartition *) node; + + if (GroupedByColumn(opNode->groupClauseList, opNode->targetList, + partitionNode->partitionColumn)) + { + /* this node is partitioned by a column in the GROUP BY */ + return true; + } + } + else if (UnaryOperator(node)) + { + MultiNode *childNode = ((MultiUnaryNode *) node)->childNode; + + if (GroupedByPartitionColumn(childNode, opNode)) + { + /* a child node is partitioned by a column in the GROUP BY */ + return true; + } + } + else if (BinaryOperator(node)) + { + MultiNode *leftChildNode = ((MultiBinaryNode *) node)->leftChildNode; + MultiNode *rightChildNode = ((MultiBinaryNode *) node)->rightChildNode; + + if (GroupedByPartitionColumn(leftChildNode, opNode) || + GroupedByPartitionColumn(rightChildNode, opNode)) + { + /* a child node is partitioned by a column in the GROUP BY */ + return true; } } - return result; + return false; } diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index e58c32623..5a41e91bc 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -316,8 +316,16 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) ListCell *tableNodeCell = NULL; MultiNode *logicalPlanNode = (MultiNode *) multiLogicalPlan; - /* check that we can optimize aggregates in the plan */ - ErrorIfContainsUnsupportedAggregate(logicalPlanNode); + List *extendedOpNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp); + MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList); + ExtendedOpNodeProperties extendedOpNodeProperties = BuildExtendedOpNodeProperties( + extendedOpNode); + + if (!extendedOpNodeProperties.groupedByDisjointPartitionColumn) + { + /* check that we can optimize aggregates in the plan */ + ErrorIfContainsUnsupportedAggregate(logicalPlanNode); + } /* * If a select node exists, we use the idempower property to split the node @@ -374,11 +382,6 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) * clause list to the worker operator node. We then push the worker operator * node below the collect node. */ - List *extendedOpNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp); - MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList); - - ExtendedOpNodeProperties extendedOpNodeProperties = BuildExtendedOpNodeProperties( - extendedOpNode); MultiExtendedOp *masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode, &extendedOpNodeProperties); @@ -1359,6 +1362,7 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode, { List *targetEntryList = originalOpNode->targetList; List *newTargetEntryList = NIL; + List *newGroupClauseList = NIL; ListCell *targetEntryCell = NULL; Node *originalHavingQual = originalOpNode->havingQual; Node *newHavingQual = NULL; @@ -1383,7 +1387,8 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode, * if the aggregate belongs to a window function, it is not mutated, but pushed * down to worker as it is. Master query should treat that as a Var. */ - if (hasAggregates && !hasWindowFunction) + if (hasAggregates && !hasWindowFunction && + !extendedOpNodeProperties->groupedByDisjointPartitionColumn) { Node *newNode = MasterAggregateMutator((Node *) originalExpression, walkerContext); @@ -1392,8 +1397,9 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode, else { /* - * The expression does not have any aggregates. We simply make it - * reference the output generated by worker nodes. + * The expression does not have any aggregates or the group by + * is on the partition column. We simply make it reference the + * output generated by worker nodes. */ const uint32 masterTableId = 1; /* only one table on master node */ @@ -1414,14 +1420,23 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode, newTargetEntryList = lappend(newTargetEntryList, newTargetEntry); } - if (originalHavingQual != NULL) + if (!extendedOpNodeProperties->groupedByDisjointPartitionColumn) { - newHavingQual = MasterAggregateMutator(originalHavingQual, walkerContext); + /* + * Not pushing down GROUP BY, need to regroup on coordinator + * and apply having on the coordinator. + */ + newGroupClauseList = originalOpNode->groupClauseList; + + if (originalHavingQual != NULL) + { + newHavingQual = MasterAggregateMutator(originalHavingQual, walkerContext); + } } MultiExtendedOp *masterExtendedOpNode = CitusMakeNode(MultiExtendedOp); masterExtendedOpNode->targetList = newTargetEntryList; - masterExtendedOpNode->groupClauseList = originalOpNode->groupClauseList; + masterExtendedOpNode->groupClauseList = newGroupClauseList; masterExtendedOpNode->sortClauseList = originalOpNode->sortClauseList; masterExtendedOpNode->distinctClause = originalOpNode->distinctClause; masterExtendedOpNode->hasDistinctOn = originalOpNode->hasDistinctOn; @@ -2213,11 +2228,15 @@ ProcessTargetListForWorkerQuery(List *targetEntryList, workerAggContext->createGroupByClause = false; /* - * If the expression uses aggregates inside window function contain agg - * clause still returns true. We want to make sure it is not a part of - * window function before we proceed. + * If the query has a window function, we currently assume it's safe to push + * down the target list. + * + * If there are aggregates without a GROUP BY on the distribution column + * then the results of those aggregates need to be combined on the coordinator. + * In that case we rewrite the expressions using WorkerAggregateWalker. */ - if (hasAggregates && !hasWindowFunction) + if (!hasWindowFunction && hasAggregates && + !extendedOpNodeProperties->groupedByDisjointPartitionColumn) { WorkerAggregateWalker((Node *) originalExpression, workerAggContext); @@ -2245,11 +2264,6 @@ ProcessTargetListForWorkerQuery(List *targetEntryList, * having clause is safe to pushdown to the workers, workerHavingQual is set to * be the original having clause. * - * TODO: Citus currently always pulls the expressions in the having clause to the - * coordinator and apply it on the coordinator. Do we really need to pull those - * expressions to the coordinator and apply the having on the coordinator if we're - * already pushing down the HAVING clause? - * * inputs: originalHavingQual, extendedOpNodeProperties * outputs: workerHavingQual, queryTargetList, queryGroupClause */ @@ -2269,18 +2283,26 @@ ProcessHavingClauseForWorkerQuery(Node *originalHavingQual, *workerHavingQual = NULL; - WorkerAggregateWalkerContext *workerAggContext = palloc0( - sizeof(WorkerAggregateWalkerContext)); - workerAggContext->expressionList = NIL; - workerAggContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns; - workerAggContext->createGroupByClause = false; + if (!extendedOpNodeProperties->groupedByDisjointPartitionColumn) + { + /* + * If the GROUP BY or PARTITION BY is not on the distribution column + * then we need to combine the aggregates in the HAVING across shards. + */ + WorkerAggregateWalkerContext *workerAggContext = palloc0( + sizeof(WorkerAggregateWalkerContext)); + workerAggContext->expressionList = NIL; + workerAggContext->pullDistinctColumns = + extendedOpNodeProperties->pullDistinctColumns; + workerAggContext->createGroupByClause = false; - WorkerAggregateWalker(originalHavingQual, workerAggContext); - List *newExpressionList = workerAggContext->expressionList; + WorkerAggregateWalker(originalHavingQual, workerAggContext); + List *newExpressionList = workerAggContext->expressionList; - ExpandWorkerTargetEntry(newExpressionList, targetEntry, - workerAggContext->createGroupByClause, - queryTargetList, queryGroupClause); + ExpandWorkerTargetEntry(newExpressionList, targetEntry, + workerAggContext->createGroupByClause, + queryTargetList, queryGroupClause); + } /* * If grouped by a partition column whose values are shards have disjoint sets diff --git a/src/test/regress/expected/aggregate_support.out b/src/test/regress/expected/aggregate_support.out index e33f16a1f..e5b8eeec9 100644 --- a/src/test/regress/expected/aggregate_support.out +++ b/src/test/regress/expected/aggregate_support.out @@ -150,6 +150,28 @@ SELECT create_distributed_function('last(anyelement)'); SELECT key, first(val ORDER BY id), last(val ORDER BY id) FROM aggdata GROUP BY key ORDER BY key; ERROR: unsupported aggregate function first +-- However, GROUP BY on distribution column gets pushed down +SELECT id, first(val ORDER BY key), last(val ORDER BY key) +FROM aggdata GROUP BY id ORDER BY id; + id | first | last +----+-------+------ + 1 | 2 | 2 + 2 | | + 3 | 2 | 2 + 4 | 3 | 3 + 5 | 5 | 5 + 6 | 4 | 4 + 7 | | + 8 | | + 9 | | + 10 | 8 | 8 + 11 | 0 | 0 +(11 rows) + +-- Test that expressions don't slip past. This fails +SELECT id%5, first(val ORDER BY key), last(val ORDER BY key) +FROM aggdata GROUP BY id%5 ORDER BY id%5; +ERROR: unsupported aggregate function first -- test aggregate with stype which is not a by-value datum -- also test our handling of the aggregate not existing on workers create function sumstring_sfunc(state text, x text) @@ -165,7 +187,7 @@ create aggregate sumstring(text) ( ); select sumstring(valf::text) from aggdata where valf is not null; ERROR: function "aggregate_support.sumstring(text)" does not exist -CONTEXT: while executing command on localhost:57638 +CONTEXT: while executing command on localhost:57637 select create_distributed_function('sumstring(text)'); create_distributed_function ----------------------------- diff --git a/src/test/regress/expected/custom_aggregate_support.out b/src/test/regress/expected/custom_aggregate_support.out index e8da03c07..ecc7cdc47 100644 --- a/src/test/regress/expected/custom_aggregate_support.out +++ b/src/test/regress/expected/custom_aggregate_support.out @@ -108,34 +108,32 @@ SELECT FROM daily_uniques GROUP BY(1); - QUERY PLAN ------------------------------------------------------------------------- - HashAggregate - Group Key: remote_scan.day - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: All - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360615 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360616 daily_uniques - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360617 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360618 daily_uniques -(25 rows) + QUERY PLAN +------------------------------------------------------------------ + Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360615 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360616 daily_uniques + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360617 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360618 daily_uniques +(23 rows) SET hll.force_groupagg to ON; EXPLAIN(COSTS OFF) @@ -144,36 +142,32 @@ SELECT FROM daily_uniques GROUP BY(1); - QUERY PLAN ------------------------------------------------------------------------------- - GroupAggregate - Group Key: remote_scan.day - -> Sort - Sort Key: remote_scan.day - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: All - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360615 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360616 daily_uniques - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360617 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360618 daily_uniques -(27 rows) + QUERY PLAN +------------------------------------------------------------------ + Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360615 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360616 daily_uniques + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360617 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360618 daily_uniques +(23 rows) -- Test disabling hash_agg with operator on coordinator query SET hll.force_groupagg to OFF; @@ -183,34 +177,32 @@ SELECT FROM daily_uniques GROUP BY(1); - QUERY PLAN ------------------------------------------------------------------------- - HashAggregate - Group Key: remote_scan.day - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: All - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360615 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360616 daily_uniques - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360617 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360618 daily_uniques -(25 rows) + QUERY PLAN +------------------------------------------------------------------ + Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360615 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360616 daily_uniques + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360617 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360618 daily_uniques +(23 rows) SET hll.force_groupagg to ON; EXPLAIN(COSTS OFF) @@ -219,36 +211,32 @@ SELECT FROM daily_uniques GROUP BY(1); - QUERY PLAN ------------------------------------------------------------------------------- - GroupAggregate - Group Key: remote_scan.day - -> Sort - Sort Key: remote_scan.day - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: All - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360615 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360616 daily_uniques - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360617 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360618 daily_uniques -(27 rows) + QUERY PLAN +------------------------------------------------------------------ + Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360615 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360616 daily_uniques + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360617 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360618 daily_uniques +(23 rows) -- Test disabling hash_agg with expression on coordinator query SET hll.force_groupagg to OFF; @@ -258,34 +246,32 @@ SELECT FROM daily_uniques GROUP BY(1); - QUERY PLAN ------------------------------------------------------------------------- - HashAggregate - Group Key: remote_scan.day - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: All - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360615 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360616 daily_uniques - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360617 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360618 daily_uniques -(25 rows) + QUERY PLAN +------------------------------------------------------------------ + Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360615 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360616 daily_uniques + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360617 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360618 daily_uniques +(23 rows) SET hll.force_groupagg to ON; EXPLAIN(COSTS OFF) @@ -294,36 +280,32 @@ SELECT FROM daily_uniques GROUP BY(1); - QUERY PLAN ------------------------------------------------------------------------------- - GroupAggregate - Group Key: remote_scan.day - -> Sort - Sort Key: remote_scan.day - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: All - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360615 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360616 daily_uniques - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360617 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360618 daily_uniques -(27 rows) + QUERY PLAN +------------------------------------------------------------------ + Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360615 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360616 daily_uniques + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360617 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360618 daily_uniques +(23 rows) -- Test disabling hash_agg with having SET hll.force_groupagg to OFF; @@ -333,34 +315,32 @@ SELECT FROM daily_uniques GROUP BY(1); - QUERY PLAN ------------------------------------------------------------------------- - HashAggregate - Group Key: remote_scan.day - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: All - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360615 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360616 daily_uniques - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360617 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> HashAggregate - Group Key: day - -> Seq Scan on daily_uniques_360618 daily_uniques -(25 rows) + QUERY PLAN +------------------------------------------------------------------ + Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360615 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360616 daily_uniques + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360617 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + -> Seq Scan on daily_uniques_360618 daily_uniques +(23 rows) SET hll.force_groupagg to ON; EXPLAIN(COSTS OFF) @@ -370,49 +350,36 @@ FROM daily_uniques GROUP BY(1) HAVING hll_cardinality(hll_union_agg(unique_users)) > 1; - QUERY PLAN ----------------------------------------------------------------------------------------------------------- - GroupAggregate - Group Key: remote_scan.day - Filter: (hll_cardinality(hll_union_agg(remote_scan.worker_column_3)) > '1'::double precision) - -> Sort - Sort Key: remote_scan.day - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: All - -> Task - Node: host=localhost port=57637 dbname=regression - -> GroupAggregate - Group Key: day - Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision) - -> Sort - Sort Key: day - -> Seq Scan on daily_uniques_360615 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> GroupAggregate - Group Key: day - Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision) - -> Sort - Sort Key: day - -> Seq Scan on daily_uniques_360616 daily_uniques - -> Task - Node: host=localhost port=57637 dbname=regression - -> GroupAggregate - Group Key: day - Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision) - -> Sort - Sort Key: day - -> Seq Scan on daily_uniques_360617 daily_uniques - -> Task - Node: host=localhost port=57638 dbname=regression - -> GroupAggregate - Group Key: day - Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision) - -> Sort - Sort Key: day - -> Seq Scan on daily_uniques_360618 daily_uniques -(40 rows) + QUERY PLAN +---------------------------------------------------------------------------------------------- + Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: All + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision) + -> Seq Scan on daily_uniques_360615 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision) + -> Seq Scan on daily_uniques_360616 daily_uniques + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate + Group Key: day + Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision) + -> Seq Scan on daily_uniques_360617 daily_uniques + -> Task + Node: host=localhost port=57638 dbname=regression + -> HashAggregate + Group Key: day + Filter: (hll_cardinality(hll_union_agg(unique_users)) > '1'::double precision) + -> Seq Scan on daily_uniques_360618 daily_uniques +(27 rows) DROP TABLE raw_table; DROP TABLE daily_uniques; diff --git a/src/test/regress/expected/multi_having_pushdown.out b/src/test/regress/expected/multi_having_pushdown.out index 9f676d8e0..9b072a0a3 100644 --- a/src/test/regress/expected/multi_having_pushdown.out +++ b/src/test/regress/expected/multi_having_pushdown.out @@ -22,27 +22,24 @@ EXPLAIN (COSTS FALSE) FROM lineitem_hash GROUP BY l_orderkey HAVING sum(l_quantity) > 24 ORDER BY 2 DESC, 1 ASC LIMIT 3; - QUERY PLAN --------------------------------------------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------------------------------- Limit -> Sort - Sort Key: (sum(remote_scan.revenue)) DESC, remote_scan.l_orderkey - -> HashAggregate - Group Key: remote_scan.l_orderkey - Filter: (sum(remote_scan.worker_column_3) > '24'::numeric) - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit - -> Sort - Sort Key: (sum((l_extendedprice * l_discount))) DESC, l_orderkey - -> HashAggregate - Group Key: l_orderkey - Filter: (sum(l_quantity) > '24'::numeric) - -> Seq Scan on lineitem_hash_590000 lineitem_hash -(18 rows) + Sort Key: remote_scan.revenue DESC, remote_scan.l_orderkey + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: (sum((l_extendedprice * l_discount))) DESC, l_orderkey + -> HashAggregate + Group Key: l_orderkey + Filter: (sum(l_quantity) > '24'::numeric) + -> Seq Scan on lineitem_hash_590000 lineitem_hash +(15 rows) -- but don't push down when table is distributed by append EXPLAIN (COSTS FALSE) @@ -98,27 +95,24 @@ EXPLAIN (COSTS FALSE) FROM lineitem_hash GROUP BY l_shipmode, l_orderkey HAVING sum(l_quantity) > 24 ORDER BY 3 DESC, 1, 2 LIMIT 3; - QUERY PLAN --------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------------------------------------------- Limit -> Sort - Sort Key: (sum(remote_scan.revenue)) DESC, remote_scan.l_shipmode, remote_scan.l_orderkey - -> HashAggregate - Group Key: remote_scan.l_shipmode, remote_scan.l_orderkey - Filter: (sum(remote_scan.worker_column_4) > '24'::numeric) - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit - -> Sort - Sort Key: (sum((l_extendedprice * l_discount))) DESC, l_shipmode, l_orderkey - -> HashAggregate - Group Key: l_shipmode, l_orderkey - Filter: (sum(l_quantity) > '24'::numeric) - -> Seq Scan on lineitem_hash_590000 lineitem_hash -(18 rows) + Sort Key: remote_scan.revenue DESC, remote_scan.l_shipmode, remote_scan.l_orderkey + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: (sum((l_extendedprice * l_discount))) DESC, l_shipmode, l_orderkey + -> HashAggregate + Group Key: l_shipmode, l_orderkey + Filter: (sum(l_quantity) > '24'::numeric) + -> Seq Scan on lineitem_hash_590000 lineitem_hash +(15 rows) -- couple more checks with joins EXPLAIN (COSTS FALSE) @@ -127,31 +121,28 @@ EXPLAIN (COSTS FALSE) WHERE o_orderkey = l_orderkey GROUP BY l_orderkey, o_orderkey, l_shipmode HAVING sum(l_quantity) > 24 ORDER BY 1 DESC LIMIT 3; - QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------ + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------- Limit -> Sort - Sort Key: (sum(remote_scan.revenue)) DESC - -> HashAggregate - Group Key: remote_scan.worker_column_2, remote_scan.worker_column_3, remote_scan.worker_column_4 - Filter: (sum(remote_scan.worker_column_5) > '24'::numeric) - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit - -> Sort - Sort Key: (sum((lineitem_hash.l_extendedprice * lineitem_hash.l_discount))) DESC - -> HashAggregate - Group Key: lineitem_hash.l_orderkey, orders_hash.o_orderkey, lineitem_hash.l_shipmode - Filter: (sum(lineitem_hash.l_quantity) > '24'::numeric) - -> Hash Join - Hash Cond: (orders_hash.o_orderkey = lineitem_hash.l_orderkey) - -> Seq Scan on orders_hash_590004 orders_hash - -> Hash - -> Seq Scan on lineitem_hash_590000 lineitem_hash -(22 rows) + Sort Key: remote_scan.revenue DESC + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: (sum((lineitem_hash.l_extendedprice * lineitem_hash.l_discount))) DESC + -> HashAggregate + Group Key: lineitem_hash.l_orderkey, orders_hash.o_orderkey, lineitem_hash.l_shipmode + Filter: (sum(lineitem_hash.l_quantity) > '24'::numeric) + -> Hash Join + Hash Cond: (orders_hash.o_orderkey = lineitem_hash.l_orderkey) + -> Seq Scan on orders_hash_590004 orders_hash + -> Hash + -> Seq Scan on lineitem_hash_590000 lineitem_hash +(19 rows) EXPLAIN (COSTS FALSE) SELECT sum(l_extendedprice * l_discount) as revenue @@ -183,9 +174,9 @@ EXPLAIN (COSTS FALSE) DROP TABLE lineitem_hash; DROP TABLE orders_hash; -SELECT max(value_1) -FROM users_table -GROUP BY user_id +SELECT max(value_1) +FROM users_table +GROUP BY user_id HAVING max(value_2) > 4 AND min(value_2) < 1 ORDER BY 1; max @@ -195,9 +186,9 @@ ORDER BY 1; 5 (3 rows) -SELECT max(value_1) -FROM users_table -GROUP BY user_id +SELECT max(value_1) +FROM users_table +GROUP BY user_id HAVING max(value_2) > 4 AND min(value_2) < 1 OR count(*) > 10 ORDER BY 1; max @@ -208,9 +199,9 @@ ORDER BY 1; 5 (4 rows) -SELECT max(value_1) -FROM users_table -GROUP BY user_id +SELECT max(value_1) +FROM users_table +GROUP BY user_id HAVING max(value_2) > 4 AND min(value_2) < 1 AND count(*) > 20 ORDER BY 1; max @@ -219,9 +210,9 @@ ORDER BY 1; 5 (2 rows) -SELECT max(value_1) -FROM users_table -GROUP BY user_id +SELECT max(value_1) +FROM users_table +GROUP BY user_id HAVING max(value_2) > 0 AND count(*) FILTER (WHERE value_3=2) > 3 AND min(value_2) IN (0,1,2,3); max ----- diff --git a/src/test/regress/expected/multi_orderby_limit_pushdown.out b/src/test/regress/expected/multi_orderby_limit_pushdown.out index fecd18e1f..432dd1f2f 100644 --- a/src/test/regress/expected/multi_orderby_limit_pushdown.out +++ b/src/test/regress/expected/multi_orderby_limit_pushdown.out @@ -34,25 +34,23 @@ FROM users_table GROUP BY user_id ORDER BY avg(value_1) DESC LIMIT 1; - QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------- Limit (cost=0.00..0.00 rows=0 width=0) -> Sort (cost=0.00..0.00 rows=0 width=0) - Sort Key: ((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1))) DESC - -> HashAggregate (cost=0.00..0.00 rows=0 width=0) - Group Key: remote_scan.user_id - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit (cost=1.70..1.70 rows=1 width=52) - -> Sort (cost=1.70..1.70 rows=2 width=52) - Sort Key: (avg(value_1)) DESC - -> HashAggregate (cost=1.66..1.69 rows=2 width=52) - Group Key: user_id - -> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=8) -(16 rows) + Sort Key: remote_scan.avg DESC + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit (cost=1.53..1.53 rows=1 width=36) + -> Sort (cost=1.53..1.53 rows=2 width=36) + Sort Key: (avg(value_1)) DESC + -> HashAggregate (cost=1.50..1.52 rows=2 width=36) + Group Key: user_id + -> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=8) +(14 rows) SELECT user_id, avg(value_1) + 1 FROM users_table @@ -98,26 +96,24 @@ ORDER BY 2 DESC; 1 | 10.2857142857142857 (6 rows) -EXPLAIN +EXPLAIN SELECT user_id, avg(value_1) + count(value_2) FROM users_table GROUP BY user_id ORDER BY 2 DESC; - QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------------------------- Sort (cost=0.00..0.00 rows=0 width=0) - Sort Key: (((pg_catalog.sum(remote_scan."?column?") / pg_catalog.sum(remote_scan."?column?_1")) + (COALESCE((pg_catalog.sum(remote_scan."?column?_2"))::bigint, '0'::bigint))::numeric)) DESC - -> HashAggregate (cost=0.00..0.00 rows=0 width=0) - Group Key: remote_scan.user_id - -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate (cost=1.66..1.68 rows=2 width=28) - Group Key: user_id - -> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=12) -(12 rows) + Sort Key: remote_scan."?column?" DESC + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate (cost=1.58..1.61 rows=2 width=36) + Group Key: user_id + -> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=12) +(10 rows) SELECT user_id, avg(value_1) + count(value_2) FROM users_table @@ -221,25 +217,23 @@ FROM users_table GROUP BY user_id ORDER BY (10000 / (sum(value_1 + value_2))) DESC LIMIT 2; - QUERY PLAN ---------------------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------- Limit -> Sort - Sort Key: ((10000 / (pg_catalog.sum(remote_scan.worker_column_2))::bigint)) DESC - -> HashAggregate - Group Key: remote_scan.user_id - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit - -> Sort - Sort Key: ((10000 / sum((value_1 + value_2)))) DESC - -> HashAggregate - Group Key: user_id - -> Seq Scan on users_table_1400256 users_table -(16 rows) + Sort Key: remote_scan.worker_column_2 DESC + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: ((10000 / sum((value_1 + value_2)))) DESC + -> HashAggregate + Group Key: user_id + -> Seq Scan on users_table_1400256 users_table +(14 rows) SELECT 10000 / (sum(value_1 + value_2)) FROM users_table @@ -289,25 +283,23 @@ FROM users_table GROUP BY user_id ORDER BY sum(value_1) DESC LIMIT 2; - QUERY PLAN ---------------------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------- Limit -> Sort - Sort Key: ((pg_catalog.sum(remote_scan.worker_column_2))::bigint) DESC - -> HashAggregate - Group Key: remote_scan.user_id - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit - -> Sort - Sort Key: (sum(value_1)) DESC - -> HashAggregate - Group Key: user_id - -> Seq Scan on users_table_1400256 users_table -(16 rows) + Sort Key: remote_scan.worker_column_2 DESC + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: (sum(value_1)) DESC + -> HashAggregate + Group Key: user_id + -> Seq Scan on users_table_1400256 users_table +(14 rows) SELECT ut.user_id, avg(ut.value_2) FROM users_table ut, events_table et @@ -331,30 +323,28 @@ WHERE ut.user_id = et.user_id and et.value_2 < 5 GROUP BY ut.user_id ORDER BY MAX(et.time), AVG(ut.value_1) LIMIT 5; - QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------------------ + QUERY PLAN +------------------------------------------------------------------------------------------- Limit -> Sort - Sort Key: (max(remote_scan.worker_column_4)), ((pg_catalog.sum(remote_scan.worker_column_5) / pg_catalog.sum(remote_scan.worker_column_6))) - -> HashAggregate - Group Key: remote_scan.user_id - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit - -> Sort - Sort Key: (max(et."time")), (avg(ut.value_1)) - -> HashAggregate - Group Key: ut.user_id - -> Hash Join - Hash Cond: (ut.user_id = et.user_id) - -> Seq Scan on users_table_1400256 ut - -> Hash - -> Seq Scan on events_table_1400260 et - Filter: (value_2 < 5) -(21 rows) + Sort Key: remote_scan.worker_column_3, remote_scan.worker_column_4 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: (max(et."time")), (avg(ut.value_1)) + -> HashAggregate + Group Key: ut.user_id + -> Hash Join + Hash Cond: (ut.user_id = et.user_id) + -> Seq Scan on users_table_1400256 ut + -> Hash + -> Seq Scan on events_table_1400260 et + Filter: (value_2 < 5) +(19 rows) SELECT ut.user_id, avg(et.value_2) FROM users_table ut, events_table et @@ -390,30 +380,28 @@ WHERE ut.user_id = et.user_id and et.value_2 < 5 GROUP BY ut.user_id ORDER BY 2, AVG(ut.value_1), 1 DESC LIMIT 5; - QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + QUERY PLAN +------------------------------------------------------------------------------------------------------------ Limit -> Sort - Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)), ((pg_catalog.sum(remote_scan.worker_column_3) / pg_catalog.sum(remote_scan.worker_column_4))), remote_scan.user_id DESC - -> HashAggregate - Group Key: remote_scan.user_id - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit - -> Sort - Sort Key: (count(DISTINCT ut.value_2)), (avg(ut.value_1)), ut.user_id DESC - -> GroupAggregate - Group Key: ut.user_id - -> Sort - Sort Key: ut.user_id DESC - -> Hash Join - Hash Cond: (ut.user_id = et.user_id) - -> Seq Scan on users_table_1400256 ut - -> Hash - -> Seq Scan on events_table_1400260 et - Filter: (value_2 < 5) -(23 rows) + Sort Key: remote_scan.count, remote_scan.worker_column_3, remote_scan.user_id DESC + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: (count(DISTINCT ut.value_2)), (avg(ut.value_1)), ut.user_id DESC + -> GroupAggregate + Group Key: ut.user_id + -> Sort + Sort Key: ut.user_id DESC + -> Hash Join + Hash Cond: (ut.user_id = et.user_id) + -> Seq Scan on users_table_1400256 ut + -> Hash + -> Seq Scan on events_table_1400260 et + Filter: (value_2 < 5) +(21 rows) diff --git a/src/test/regress/expected/multi_repartition_join_planning.out b/src/test/regress/expected/multi_repartition_join_planning.out index 5fd294540..7290429d4 100644 --- a/src/test/regress/expected/multi_repartition_join_planning.out +++ b/src/test/regress/expected/multi_repartition_join_planning.out @@ -461,10 +461,10 @@ DEBUG: generated sql query for task 3 DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690006 stock WHERE true" DEBUG: generated sql query for task 4 DETAIL: query string: "SELECT s_i_id, s_w_id, s_quantity FROM stock_690007 stock WHERE true" -DEBUG: assigned task 1 to node localhost:57637 -DEBUG: assigned task 2 to node localhost:57638 -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: assigned task 2 to node localhost:57637 +DEBUG: assigned task 1 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 3 to node localhost:57638 DEBUG: generated sql query for task 1 DETAIL: query string: "SELECT ol_i_id FROM order_line_690000 order_line WHERE true" DEBUG: generated sql query for task 2 @@ -490,13 +490,13 @@ DEBUG: join prunable for task partitionId 3 and 0 DEBUG: join prunable for task partitionId 3 and 1 DEBUG: join prunable for task partitionId 3 and 2 DEBUG: generated sql query for task 3 -DETAIL: query string: "SELECT "pg_merge_job_0016.task_000005".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000005".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000005 "pg_merge_job_0016.task_000005" JOIN pg_merge_job_0017.task_000005 "pg_merge_job_0017.task_000005" ON (("pg_merge_job_0017.task_000005".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000005".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000005".intermediate_column_16_0, "pg_merge_job_0016.task_000005".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" +DETAIL: query string: "SELECT "pg_merge_job_0016.task_000005".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000005".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2) AS worker_column_3 FROM (pg_merge_job_0016.task_000005 "pg_merge_job_0016.task_000005" JOIN pg_merge_job_0017.task_000005 "pg_merge_job_0017.task_000005" ON (("pg_merge_job_0017.task_000005".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000005".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000005".intermediate_column_16_0, "pg_merge_job_0016.task_000005".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000005".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" DEBUG: generated sql query for task 6 -DETAIL: query string: "SELECT "pg_merge_job_0016.task_000010".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000010".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000010 "pg_merge_job_0016.task_000010" JOIN pg_merge_job_0017.task_000010 "pg_merge_job_0017.task_000010" ON (("pg_merge_job_0017.task_000010".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000010".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000010".intermediate_column_16_0, "pg_merge_job_0016.task_000010".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" +DETAIL: query string: "SELECT "pg_merge_job_0016.task_000010".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000010".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2) AS worker_column_3 FROM (pg_merge_job_0016.task_000010 "pg_merge_job_0016.task_000010" JOIN pg_merge_job_0017.task_000010 "pg_merge_job_0017.task_000010" ON (("pg_merge_job_0017.task_000010".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000010".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000010".intermediate_column_16_0, "pg_merge_job_0016.task_000010".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000010".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" DEBUG: generated sql query for task 9 -DETAIL: query string: "SELECT "pg_merge_job_0016.task_000015".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000015".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000015 "pg_merge_job_0016.task_000015" JOIN pg_merge_job_0017.task_000015 "pg_merge_job_0017.task_000015" ON (("pg_merge_job_0017.task_000015".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000015".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000015".intermediate_column_16_0, "pg_merge_job_0016.task_000015".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" +DETAIL: query string: "SELECT "pg_merge_job_0016.task_000015".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000015".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2) AS worker_column_3 FROM (pg_merge_job_0016.task_000015 "pg_merge_job_0016.task_000015" JOIN pg_merge_job_0017.task_000015 "pg_merge_job_0017.task_000015" ON (("pg_merge_job_0017.task_000015".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000015".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000015".intermediate_column_16_0, "pg_merge_job_0016.task_000015".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000015".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" DEBUG: generated sql query for task 12 -DETAIL: query string: "SELECT "pg_merge_job_0016.task_000020".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000020".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2) AS worker_column_3, any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2) AS worker_column_4 FROM (pg_merge_job_0016.task_000020 "pg_merge_job_0016.task_000020" JOIN pg_merge_job_0017.task_000020 "pg_merge_job_0017.task_000020" ON (("pg_merge_job_0017.task_000020".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000020".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000020".intermediate_column_16_0, "pg_merge_job_0016.task_000020".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" +DETAIL: query string: "SELECT "pg_merge_job_0016.task_000020".intermediate_column_16_0 AS s_i_id, "pg_merge_job_0016.task_000020".intermediate_column_16_1 AS worker_column_2, any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2) AS worker_column_3 FROM (pg_merge_job_0016.task_000020 "pg_merge_job_0016.task_000020" JOIN pg_merge_job_0017.task_000020 "pg_merge_job_0017.task_000020" ON (("pg_merge_job_0017.task_000020".intermediate_column_17_0 OPERATOR(pg_catalog.=) "pg_merge_job_0016.task_000020".intermediate_column_16_0))) WHERE true GROUP BY "pg_merge_job_0016.task_000020".intermediate_column_16_0, "pg_merge_job_0016.task_000020".intermediate_column_16_1 HAVING ((any_value("pg_merge_job_0016.task_000020".intermediate_column_16_2))::double precision OPERATOR(pg_catalog.>) random())" DEBUG: pruning merge fetch taskId 1 DETAIL: Creating dependency on merge taskId 5 DEBUG: pruning merge fetch taskId 2 diff --git a/src/test/regress/expected/multi_select_distinct.out b/src/test/regress/expected/multi_select_distinct.out index 642775211..840e2895a 100644 --- a/src/test/regress/expected/multi_select_distinct.out +++ b/src/test/regress/expected/multi_select_distinct.out @@ -207,13 +207,12 @@ EXPLAIN (COSTS FALSE) GROUP BY 1 HAVING count(*) > 5 ORDER BY 2 DESC, 1; - QUERY PLAN ---------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------- Sort - Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) DESC, remote_scan.l_orderkey + Sort Key: remote_scan.count DESC, remote_scan.l_orderkey -> HashAggregate - Group Key: remote_scan.l_orderkey - Filter: (COALESCE((pg_catalog.sum(remote_scan.worker_column_3))::bigint, '0'::bigint) > 5) + Group Key: remote_scan.count, remote_scan.l_orderkey -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 @@ -224,7 +223,7 @@ EXPLAIN (COSTS FALSE) Filter: (count(*) > 5) -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part Filter: (l_orderkey < 200) -(15 rows) +(14 rows) -- check the plan if the hash aggreate is disabled SET enable_hashagg TO off; @@ -235,15 +234,13 @@ EXPLAIN (COSTS FALSE) GROUP BY 1 HAVING count(*) > 5 ORDER BY 2 DESC, 1; - QUERY PLAN ---------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------------- Sort - Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) DESC, remote_scan.l_orderkey - -> GroupAggregate - Group Key: remote_scan.l_orderkey - Filter: (COALESCE((pg_catalog.sum(remote_scan.worker_column_3))::bigint, '0'::bigint) > 5) + Sort Key: remote_scan.count DESC, remote_scan.l_orderkey + -> Unique -> Sort - Sort Key: remote_scan.l_orderkey + Sort Key: remote_scan.count DESC, remote_scan.l_orderkey -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 @@ -254,7 +251,7 @@ EXPLAIN (COSTS FALSE) Filter: (count(*) > 5) -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part Filter: (l_orderkey < 200) -(17 rows) +(15 rows) SET enable_hashagg TO on; -- distinct on aggregate of group by columns, we try to check whether we handle @@ -782,8 +779,7 @@ EXPLAIN (COSTS FALSE) Limit -> Sort Sort Key: remote_scan.l_orderkey, remote_scan.l_partkey - -> GroupAggregate - Group Key: remote_scan.l_orderkey, remote_scan.l_partkey, remote_scan.l_suppkey, remote_scan.l_linenumber, remote_scan.l_quantity, remote_scan.l_extendedprice, remote_scan.l_discount, remote_scan.l_tax, remote_scan.l_returnflag, remote_scan.l_linestatus, remote_scan.l_shipdate, remote_scan.l_commitdate, remote_scan.l_receiptdate, remote_scan.l_shipinstruct, remote_scan.l_shipmode, remote_scan.l_comment + -> Unique -> Sort Sort Key: remote_scan.l_orderkey, remote_scan.l_partkey, remote_scan.l_suppkey, remote_scan.l_linenumber, remote_scan.l_quantity, remote_scan.l_extendedprice, remote_scan.l_discount, remote_scan.l_tax, remote_scan.l_returnflag, remote_scan.l_linestatus, remote_scan.l_shipdate, remote_scan.l_commitdate, remote_scan.l_receiptdate, remote_scan.l_shipinstruct, remote_scan.l_shipmode, remote_scan.l_comment -> Custom Scan (Citus Adaptive) @@ -798,7 +794,7 @@ EXPLAIN (COSTS FALSE) -> Sort Sort Key: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(19 rows) +(18 rows) SET enable_hashagg TO on; -- distinct on count distinct @@ -842,14 +838,39 @@ EXPLAIN (COSTS FALSE) FROM lineitem_hash_part GROUP BY l_orderkey ORDER BY 1,2; - QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------------- Sort - Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)), (COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint)) + Sort Key: remote_scan.count, remote_scan.count_1 -> HashAggregate - Group Key: COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint), COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint) - -> HashAggregate - Group Key: remote_scan.worker_column_3 + Group Key: remote_scan.count, remote_scan.count_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> GroupAggregate + Group Key: l_orderkey + -> Sort + Sort Key: l_orderkey + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part +(14 rows) + +-- check the plan if the hash aggreate is disabled. We expect to see sort + unique +-- plans for the outer distinct. +SET enable_hashagg TO off; +EXPLAIN (COSTS FALSE) + SELECT DISTINCT count(DISTINCT l_partkey), count(DISTINCT l_shipmode) + FROM lineitem_hash_part + GROUP BY l_orderkey + ORDER BY 1,2; + QUERY PLAN +---------------------------------------------------------------------------------------------------- + Sort + Sort Key: remote_scan.count, remote_scan.count_1 + -> Unique + -> Sort + Sort Key: remote_scan.count, remote_scan.count_1 -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 @@ -860,38 +881,7 @@ EXPLAIN (COSTS FALSE) -> Sort Sort Key: l_orderkey -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(16 rows) - --- check the plan if the hash aggreate is disabled. We expect to see sort + unique --- plans for the outer distinct. -SET enable_hashagg TO off; -EXPLAIN (COSTS FALSE) - SELECT DISTINCT count(DISTINCT l_partkey), count(DISTINCT l_shipmode) - FROM lineitem_hash_part - GROUP BY l_orderkey - ORDER BY 1,2; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Sort - Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)), (COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint)) - -> Unique - -> Sort - Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)), (COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint)) - -> GroupAggregate - Group Key: remote_scan.worker_column_3 - -> Sort - Sort Key: remote_scan.worker_column_3 - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=57637 dbname=regression - -> GroupAggregate - Group Key: l_orderkey - -> Sort - Sort Key: l_orderkey - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(19 rows) +(15 rows) SET enable_hashagg TO on; -- distinct on aggregation with filter and expression @@ -968,15 +958,41 @@ EXPLAIN (COSTS FALSE) GROUP BY l_orderkey ORDER BY 2 LIMIT 15; - QUERY PLAN -------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------------------- Limit -> Sort - Sort Key: (array_length(array_cat_agg(remote_scan.array_length), 1)) + Sort Key: remote_scan.array_length -> HashAggregate - Group Key: array_length(array_cat_agg(remote_scan.array_length), 1), array_cat_agg(remote_scan.array_agg) - -> HashAggregate - Group Key: remote_scan.worker_column_3 + Group Key: remote_scan.array_length, remote_scan.array_agg + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> GroupAggregate + Group Key: l_orderkey + -> Sort + Sort Key: l_orderkey + -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part +(15 rows) + +-- check the plan if the hash aggreate is disabled. +SET enable_hashagg TO off; +EXPLAIN (COSTS FALSE) + SELECT DISTINCT array_agg(l_linenumber), array_length(array_agg(l_linenumber), 1) + FROM lineitem_hash_part + GROUP BY l_orderkey + ORDER BY 2 + LIMIT 15; + QUERY PLAN +---------------------------------------------------------------------------------------------------------- + Limit + -> Sort + Sort Key: remote_scan.array_length + -> Unique + -> Sort + Sort Key: remote_scan.array_length, remote_scan.array_agg -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: One of 4 @@ -987,39 +1003,7 @@ EXPLAIN (COSTS FALSE) -> Sort Sort Key: l_orderkey -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(17 rows) - --- check the plan if the hash aggreate is disabled. -SET enable_hashagg TO off; -EXPLAIN (COSTS FALSE) - SELECT DISTINCT array_agg(l_linenumber), array_length(array_agg(l_linenumber), 1) - FROM lineitem_hash_part - GROUP BY l_orderkey - ORDER BY 2 - LIMIT 15; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------- - Limit - -> Sort - Sort Key: (array_length(array_cat_agg(remote_scan.array_length), 1)) - -> Unique - -> Sort - Sort Key: (array_length(array_cat_agg(remote_scan.array_length), 1)), (array_cat_agg(remote_scan.array_agg)) - -> GroupAggregate - Group Key: remote_scan.worker_column_3 - -> Sort - Sort Key: remote_scan.worker_column_3 - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=57637 dbname=regression - -> GroupAggregate - Group Key: l_orderkey - -> Sort - Sort Key: l_orderkey - -> Seq Scan on lineitem_hash_part_360041 lineitem_hash_part -(20 rows) +(16 rows) SET enable_hashagg TO on; -- distinct on non-partition column with aggregate diff --git a/src/test/regress/expected/multi_view.out b/src/test/regress/expected/multi_view.out index ad85a2cd2..4671a2d80 100644 --- a/src/test/regress/expected/multi_view.out +++ b/src/test/regress/expected/multi_view.out @@ -807,8 +807,8 @@ EXPLAIN (COSTS FALSE) SELECT * (23 rows) EXPLAIN (COSTS FALSE) SELECT et.* FROM recent_10_users JOIN events_table et USING(user_id) ORDER BY et.time DESC LIMIT 10; - QUERY PLAN ---------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------------------------------- Limit -> Sort Sort Key: remote_scan."time" DESC @@ -816,20 +816,18 @@ EXPLAIN (COSTS FALSE) SELECT et.* FROM recent_10_users JOIN events_table et USIN -> Distributed Subplan 90_1 -> Limit -> Sort - Sort Key: (max(remote_scan.lastseen)) DESC - -> HashAggregate - Group Key: remote_scan.user_id - -> Custom Scan (Citus Adaptive) - Task Count: 4 - Tasks Shown: One of 4 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit - -> Sort - Sort Key: (max("time")) DESC - -> HashAggregate - Group Key: user_id - -> Seq Scan on users_table_1400256 users_table + Sort Key: remote_scan.lastseen DESC + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + -> Sort + Sort Key: (max("time")) DESC + -> HashAggregate + Group Key: user_id + -> Seq Scan on users_table_1400256 users_table Task Count: 4 Tasks Shown: One of 4 -> Task @@ -842,7 +840,7 @@ EXPLAIN (COSTS FALSE) SELECT et.* FROM recent_10_users JOIN events_table et USIN -> Function Scan on read_intermediate_result intermediate_result -> Hash -> Seq Scan on events_table_1400260 et -(33 rows) +(31 rows) SET citus.subquery_pushdown to ON; EXPLAIN (COSTS FALSE) SELECT et.* FROM recent_10_users JOIN events_table et USING(user_id) ORDER BY et.time DESC LIMIT 10; @@ -910,7 +908,7 @@ DELETE FROM small_view; ERROR: cannot modify views over distributed tables INSERT INTO small_view VALUES(8, 5) ON CONFLICT(tenant_id) DO UPDATE SET tenant_id=99; ERROR: cannot modify views over distributed tables --- using views in modify statements' FROM / WHERE clauses is still valid +-- using views in modify statements' FROM / WHERE clauses is still valid UPDATE large SET id=20 FROM small_view WHERE small_view.id=large.id; SELECT * FROM large order by 1, 2; id | tenant_id @@ -924,7 +922,7 @@ SELECT * FROM large order by 1, 2; -- we should still have identical rows for next test statements, then insert new rows to both tables INSERT INTO large VALUES(14, 14); INSERT INTO small VALUES(14, 14); --- using views in subqueries within modify statements is still valid +-- using views in subqueries within modify statements is still valid UPDATE large SET id=23 FROM (SELECT *, id*2 from small_view ORDER BY 1,2 LIMIT 5) as small_view WHERE small_view.id=large.id; SELECT * FROM large order by 1, 2; id | tenant_id @@ -938,8 +936,8 @@ SELECT * FROM large order by 1, 2; -- we should still have identical rows for next test statements, then insert a new row to large table INSERT INTO large VALUES(14, 14); --- using views in modify statements' FROM / WHERE clauses is still valid -UPDATE large SET id=27 FROM small_view WHERE small_view.tenant_id=large.tenant_id; +-- using views in modify statements' FROM / WHERE clauses is still valid +UPDATE large SET id=27 FROM small_view WHERE small_view.tenant_id=large.tenant_id; SELECT * FROM large ORDER BY 1, 2; id | tenant_id ----+----------- @@ -1150,7 +1148,7 @@ SELECT create_distributed_table('small','tenant_id'); CREATE VIEW small_view AS SELECT id, tenant_id FROM (SELECT *, id*2 FROM small WHERE id < 100 ORDER BY 1,2 LIMIT 5) as foo; \copy small FROM STDIN DELIMITER ',' \copy large FROM STDIN DELIMITER ',' --- using views in modify statements' FROM / WHERE clauses is still valid +-- using views in modify statements' FROM / WHERE clauses is still valid UPDATE large SET id=20 FROM small_view WHERE small_view.id=large.id; SELECT * FROM large order by 1, 2; id | tenant_id @@ -1164,7 +1162,7 @@ SELECT * FROM large order by 1, 2; -- we should still have identical rows for next test statements, then insert new rows to both tables INSERT INTO large VALUES(14, 14); INSERT INTO small VALUES(14, 14); --- using views in subqueries within modify statements is still valid +-- using views in subqueries within modify statements is still valid UPDATE large SET id=23 FROM (SELECT *, id*2 from small_view ORDER BY 1,2 LIMIT 5) as small_view WHERE small_view.id=large.id; SELECT * FROM large order by 1, 2; id | tenant_id @@ -1178,8 +1176,8 @@ SELECT * FROM large order by 1, 2; -- we should still have identical rows for next test statements, then insert a new row to large table INSERT INTO large VALUES(14, 14); --- using views in modify statements' FROM / WHERE clauses is still valid -UPDATE large SET id=27 FROM small_view WHERE small_view.tenant_id=large.tenant_id; +-- using views in modify statements' FROM / WHERE clauses is still valid +UPDATE large SET id=27 FROM small_view WHERE small_view.tenant_id=large.tenant_id; SELECT * FROM large ORDER BY 1, 2; id | tenant_id ----+----------- diff --git a/src/test/regress/expected/subqueries_not_supported.out b/src/test/regress/expected/subqueries_not_supported.out index 2ee0aa9cd..3eab8552a 100644 --- a/src/test/regress/expected/subqueries_not_supported.out +++ b/src/test/regress/expected/subqueries_not_supported.out @@ -54,11 +54,11 @@ SELECT FROM ( SELECT - array_agg(users_table.user_id ORDER BY users_table.time) + array_agg(users_table.value_2 ORDER BY users_table.time) FROM users_table, (SELECT user_id FROM events_table) as evs WHERE users_table.user_id = evs.user_id - GROUP BY users_table.user_id + GROUP BY users_table.value_2 LIMIT 5 ) as foo; ERROR: array_agg with order by is unsupported diff --git a/src/test/regress/output/multi_complex_count_distinct.source b/src/test/regress/output/multi_complex_count_distinct.source index 769fc0c77..c1907f659 100644 --- a/src/test/regress/output/multi_complex_count_distinct.source +++ b/src/test/regress/output/multi_complex_count_distinct.source @@ -33,7 +33,7 @@ SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash'); ANALYZE lineitem_hash; SET citus.task_executor_type to "task-tracker"; -- count(distinct) is supported on top level query if there --- is a grouping on the partition key +-- is a grouping on the partition key SELECT l_orderkey, count(DISTINCT l_partkey) FROM lineitem_hash @@ -61,33 +61,30 @@ SELECT GROUP BY l_orderkey ORDER BY 2 DESC, 1 DESC LIMIT 10; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit - Output: remote_scan.l_orderkey, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) + Output: remote_scan.l_orderkey, remote_scan.count -> Sort - Output: remote_scan.l_orderkey, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) - Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) DESC, remote_scan.l_orderkey DESC - -> HashAggregate - Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) - Group Key: remote_scan.l_orderkey - -> Custom Scan (Citus Task-Tracker) - Output: remote_scan.l_orderkey, remote_scan.count - Task Count: 8 - Tasks Shown: One of 8 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit + Output: remote_scan.l_orderkey, remote_scan.count + Sort Key: remote_scan.count DESC, remote_scan.l_orderkey DESC + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_orderkey, remote_scan.count + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + Output: l_orderkey, (count(DISTINCT l_partkey)) + -> Sort Output: l_orderkey, (count(DISTINCT l_partkey)) - -> Sort - Output: l_orderkey, (count(DISTINCT l_partkey)) - Sort Key: (count(DISTINCT lineitem_hash.l_partkey)) DESC, lineitem_hash.l_orderkey DESC - -> GroupAggregate - Output: l_orderkey, count(DISTINCT l_partkey) - Group Key: lineitem_hash.l_orderkey - -> Index Scan Backward using lineitem_hash_pkey_240000 on public.lineitem_hash_240000 lineitem_hash - Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -(24 rows) + Sort Key: (count(DISTINCT lineitem_hash.l_partkey)) DESC, lineitem_hash.l_orderkey DESC + -> GroupAggregate + Output: l_orderkey, count(DISTINCT l_partkey) + Group Key: lineitem_hash.l_orderkey + -> Index Scan Backward using lineitem_hash_pkey_240000 on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +(21 rows) -- it is also supported if there is no grouping or grouping is on non-partition field SELECT @@ -206,33 +203,30 @@ SELECT GROUP BY l_orderkey ORDER BY 3 DESC, 2 DESC, 1 LIMIT 10; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit - Output: remote_scan.l_orderkey, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)), (COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint)) + Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 -> Sort - Output: remote_scan.l_orderkey, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)), (COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint)) - Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint)) DESC, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) DESC, remote_scan.l_orderkey - -> HashAggregate - Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint), COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint) - Group Key: remote_scan.l_orderkey - -> Custom Scan (Citus Task-Tracker) - Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 - Task Count: 8 - Tasks Shown: One of 8 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit + Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 + Sort Key: remote_scan.count_1 DESC, remote_scan.count DESC, remote_scan.l_orderkey + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + Output: l_orderkey, (count(DISTINCT l_partkey)), (count(DISTINCT l_shipmode)) + -> Sort Output: l_orderkey, (count(DISTINCT l_partkey)), (count(DISTINCT l_shipmode)) - -> Sort - Output: l_orderkey, (count(DISTINCT l_partkey)), (count(DISTINCT l_shipmode)) - Sort Key: (count(DISTINCT lineitem_hash.l_shipmode)) DESC, (count(DISTINCT lineitem_hash.l_partkey)) DESC, lineitem_hash.l_orderkey - -> GroupAggregate - Output: l_orderkey, count(DISTINCT l_partkey), count(DISTINCT l_shipmode) - Group Key: lineitem_hash.l_orderkey - -> Index Scan using lineitem_hash_pkey_240000 on public.lineitem_hash_240000 lineitem_hash - Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -(24 rows) + Sort Key: (count(DISTINCT lineitem_hash.l_shipmode)) DESC, (count(DISTINCT lineitem_hash.l_partkey)) DESC, lineitem_hash.l_orderkey + -> GroupAggregate + Output: l_orderkey, count(DISTINCT l_partkey), count(DISTINCT l_shipmode) + Group Key: lineitem_hash.l_orderkey + -> Index Scan using lineitem_hash_pkey_240000 on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +(21 rows) -- partition/non-partition column count distinct no grouping SELECT @@ -490,33 +484,30 @@ SELECT GROUP BY l_orderkey ORDER BY 2 DESC, 3 DESC, 1 LIMIT 10; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit - Output: remote_scan.l_orderkey, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)), (COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint)) + Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 -> Sort - Output: remote_scan.l_orderkey, (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)), (COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint)) - Sort Key: (COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint)) DESC, (COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint)) DESC, remote_scan.l_orderkey - -> HashAggregate - Output: remote_scan.l_orderkey, COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint), COALESCE((pg_catalog.sum(remote_scan.count_1))::bigint, '0'::bigint) - Group Key: remote_scan.l_orderkey - -> Custom Scan (Citus Task-Tracker) - Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 - Task Count: 8 - Tasks Shown: One of 8 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit + Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 + Sort Key: remote_scan.count DESC, remote_scan.count_1 DESC, remote_scan.l_orderkey + -> Custom Scan (Citus Task-Tracker) + Output: remote_scan.l_orderkey, remote_scan.count, remote_scan.count_1 + Task Count: 8 + Tasks Shown: One of 8 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit + Output: l_orderkey, (count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar))), (count(DISTINCT l_suppkey)) + -> Sort Output: l_orderkey, (count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar))), (count(DISTINCT l_suppkey)) - -> Sort - Output: l_orderkey, (count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar))), (count(DISTINCT l_suppkey)) - Sort Key: (count(DISTINCT lineitem_hash.l_suppkey) FILTER (WHERE (lineitem_hash.l_shipmode = 'AIR'::bpchar))) DESC, (count(DISTINCT lineitem_hash.l_suppkey)) DESC, lineitem_hash.l_orderkey - -> GroupAggregate - Output: l_orderkey, count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar)), count(DISTINCT l_suppkey) - Group Key: lineitem_hash.l_orderkey - -> Index Scan using lineitem_hash_pkey_240000 on public.lineitem_hash_240000 lineitem_hash - Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment -(24 rows) + Sort Key: (count(DISTINCT lineitem_hash.l_suppkey) FILTER (WHERE (lineitem_hash.l_shipmode = 'AIR'::bpchar))) DESC, (count(DISTINCT lineitem_hash.l_suppkey)) DESC, lineitem_hash.l_orderkey + -> GroupAggregate + Output: l_orderkey, count(DISTINCT l_suppkey) FILTER (WHERE (l_shipmode = 'AIR'::bpchar)), count(DISTINCT l_suppkey) + Group Key: lineitem_hash.l_orderkey + -> Index Scan using lineitem_hash_pkey_240000 on public.lineitem_hash_240000 lineitem_hash + Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment +(21 rows) -- group by on non-partition column SELECT @@ -922,7 +913,7 @@ SELECT * -- distinct on non-var (type cast/field select) columns are also -- supported if grouped on distribution column -- random is added to prevent flattening by postgresql -SELECT +SELECT l_orderkey, count(a::int), count(distinct a::int) FROM ( SELECT l_orderkey, l_orderkey * 1.5 a, random() b diff --git a/src/test/regress/sql/aggregate_support.sql b/src/test/regress/sql/aggregate_support.sql index f0360bfc9..7641c3934 100644 --- a/src/test/regress/sql/aggregate_support.sql +++ b/src/test/regress/sql/aggregate_support.sql @@ -97,6 +97,14 @@ SELECT create_distributed_function('last(anyelement)'); SELECT key, first(val ORDER BY id), last(val ORDER BY id) FROM aggdata GROUP BY key ORDER BY key; +-- However, GROUP BY on distribution column gets pushed down +SELECT id, first(val ORDER BY key), last(val ORDER BY key) +FROM aggdata GROUP BY id ORDER BY id; + +-- Test that expressions don't slip past. This fails +SELECT id%5, first(val ORDER BY key), last(val ORDER BY key) +FROM aggdata GROUP BY id%5 ORDER BY id%5; + -- test aggregate with stype which is not a by-value datum -- also test our handling of the aggregate not existing on workers create function sumstring_sfunc(state text, x text) diff --git a/src/test/regress/sql/subqueries_not_supported.sql b/src/test/regress/sql/subqueries_not_supported.sql index ce4f9fbdb..470599d33 100644 --- a/src/test/regress/sql/subqueries_not_supported.sql +++ b/src/test/regress/sql/subqueries_not_supported.sql @@ -53,11 +53,11 @@ SELECT FROM ( SELECT - array_agg(users_table.user_id ORDER BY users_table.time) + array_agg(users_table.value_2 ORDER BY users_table.time) FROM users_table, (SELECT user_id FROM events_table) as evs WHERE users_table.user_id = evs.user_id - GROUP BY users_table.user_id + GROUP BY users_table.value_2 LIMIT 5 ) as foo;