From 863bf49507ea2c59624b86ffaea571ea7dcbc1a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 16 Dec 2019 19:40:35 +0000 Subject: [PATCH] Implement pulling up rows to coordinator when aggregates cannot be pushed down. Enabled by default --- .../planner/extended_op_node_utils.c | 4 +- .../planner/multi_logical_optimizer.c | 478 +++++++++++++----- .../planner/multi_master_planner.c | 16 +- src/backend/distributed/shared_library_init.c | 19 + .../distributed/extended_op_node_utils.h | 5 +- .../distributed/multi_logical_optimizer.h | 13 +- .../regress/expected/aggregate_support.out | 148 ++++++ .../expected/custom_aggregate_support.out | 1 + .../expected/custom_aggregate_support_0.out | 1 + .../expected/custom_aggregate_support_1.out | 1 + .../multi_agg_approximate_distinct.out | 1 + .../multi_agg_approximate_distinct_0.out | 1 + src/test/regress/expected/multi_array_agg.out | 1 + src/test/regress/expected/multi_json_agg.out | 1 + .../expected/multi_json_object_agg.out | 1 + src/test/regress/expected/multi_jsonb_agg.out | 1 + .../expected/multi_jsonb_object_agg.out | 1 + .../regress/expected/multi_simple_queries.out | 1 + .../expected/multi_simple_queries_0.out | 1 + src/test/regress/expected/multi_subquery.out | 1 + src/test/regress/expected/set_operations.out | 1 + .../expected/subqueries_not_supported.out | 1 + src/test/regress/expected/with_basics.out | 1 + .../regress/input/multi_agg_distinct.source | 1 + .../input/multi_complex_count_distinct.source | 1 + .../regress/output/multi_agg_distinct.source | 1 + .../multi_complex_count_distinct.source | 1 + src/test/regress/sql/aggregate_support.sql | 40 ++ .../regress/sql/custom_aggregate_support.sql | 1 + .../sql/multi_agg_approximate_distinct.sql | 2 + src/test/regress/sql/multi_array_agg.sql | 1 + src/test/regress/sql/multi_json_agg.sql | 1 + .../regress/sql/multi_json_object_agg.sql | 1 + src/test/regress/sql/multi_jsonb_agg.sql | 1 + .../regress/sql/multi_jsonb_object_agg.sql | 1 + src/test/regress/sql/multi_simple_queries.sql | 1 + src/test/regress/sql/multi_subquery.sql | 1 + src/test/regress/sql/set_operations.sql | 1 + .../regress/sql/subqueries_not_supported.sql | 1 + src/test/regress/sql/with_basics.sql | 1 + 40 files changed, 609 insertions(+), 147 deletions(-) diff --git a/src/backend/distributed/planner/extended_op_node_utils.c b/src/backend/distributed/planner/extended_op_node_utils.c index 0b0fea1f6..1281d1879 100644 --- a/src/backend/distributed/planner/extended_op_node_utils.c +++ b/src/backend/distributed/planner/extended_op_node_utils.c @@ -42,7 +42,8 @@ static bool ShouldPullDistinctColumn(bool repartitionSubquery, * value should be used in a read-only manner. */ ExtendedOpNodeProperties -BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode) +BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode, bool + pullUpIntermediateRows) { ExtendedOpNodeProperties extendedOpNodeProperties; @@ -75,6 +76,7 @@ BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode) hasNonPartitionColumnDistinctAgg; extendedOpNodeProperties.pullDistinctColumns = pullDistinctColumns; extendedOpNodeProperties.pushDownWindowFunctions = pushDownWindowFunctions; + extendedOpNodeProperties.pullUpIntermediateRows = pullUpIntermediateRows; return extendedOpNodeProperties; } diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 5a41e91bc..1583d8f11 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -27,8 +27,10 @@ #include "distributed/citus_nodes.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" +#include "distributed/errormessage.h" #include "distributed/extended_op_node_utils.h" #include "distributed/function_utils.h" +#include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_planner.h" @@ -59,19 +61,19 @@ /* Config variable managed via guc.c */ int LimitClauseRowFetchCount = -1; /* number of rows to fetch from each task */ double CountDistinctErrorRate = 0.0; /* precision of count(distinct) approximate */ - +int CoordinatorAggregationStrategy = COORDINATOR_AGGREGATION_ROW_GATHER; typedef struct MasterAggregateWalkerContext { + const ExtendedOpNodeProperties *extendedOpNodeProperties; AttrNumber columnId; - bool pullDistinctColumns; } MasterAggregateWalkerContext; typedef struct WorkerAggregateWalkerContext { + const ExtendedOpNodeProperties *extendedOpNodeProperties; List *expressionList; bool createGroupByClause; - bool pullDistinctColumns; } WorkerAggregateWalkerContext; @@ -189,7 +191,8 @@ static void ParentSetNewChild(MultiNode *parentNode, MultiNode *oldChildNode, static void ApplyExtendedOpNodes(MultiExtendedOp *originalNode, MultiExtendedOp *masterNode, MultiExtendedOp *workerNode); -static void TransformSubqueryNode(MultiTable *subqueryNode); +static void TransformSubqueryNode(MultiTable *subqueryNode, bool + requiresIntermediateRowPullUp); static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode, ExtendedOpNodeProperties * extendedOpNodeProperties); @@ -267,12 +270,18 @@ static Const * MakeIntegerConst(int32 integerValue); static Const * MakeIntegerConstInt64(int64 integerValue); /* Local functions forward declarations for aggregate expression checks */ -static void ErrorIfContainsUnsupportedAggregate(MultiNode *logicalPlanNode); -static void ErrorIfUnsupportedArrayAggregate(Aggref *arrayAggregateExpression); -static void ErrorIfUnsupportedJsonAggregate(AggregateType type, - Aggref *aggregateExpression); -static void ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, - MultiNode *logicalPlanNode); +static bool RequiresIntermediateRowPullUp(MultiNode *logicalPlanNode); +static DeferredErrorMessage * DeferErrorIfContainsNonPushdownableAggregate( + MultiNode *logicalPlanNode); +static DeferredErrorMessage * DeferErrorIfUnsupportedArrayAggregate( + Aggref *arrayAggregateExpression); +static DeferredErrorMessage * DeferErrorIfUnsupportedJsonAggregate(AggregateType type, + Aggref * + aggregateExpression); +static DeferredErrorMessage * DeferErrorIfUnsupportedAggregateDistinct( + Aggref *aggregateExpression, + MultiNode * + logicalPlanNode); static Var * AggregateDistinctColumn(Aggref *aggregateExpression); static bool TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode, @@ -315,16 +324,29 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) ListCell *collectNodeCell = NULL; ListCell *tableNodeCell = NULL; MultiNode *logicalPlanNode = (MultiNode *) multiLogicalPlan; - + bool requiresIntermediateRowPullUp = RequiresIntermediateRowPullUp(logicalPlanNode); List *extendedOpNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp); MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList); ExtendedOpNodeProperties extendedOpNodeProperties = BuildExtendedOpNodeProperties( - extendedOpNode); + extendedOpNode, requiresIntermediateRowPullUp); - if (!extendedOpNodeProperties.groupedByDisjointPartitionColumn) + if (!extendedOpNodeProperties.groupedByDisjointPartitionColumn && + !extendedOpNodeProperties.pullUpIntermediateRows) { - /* check that we can optimize aggregates in the plan */ - ErrorIfContainsUnsupportedAggregate(logicalPlanNode); + DeferredErrorMessage *aggregatePushdownError = + DeferErrorIfContainsNonPushdownableAggregate(logicalPlanNode); + + if (aggregatePushdownError != NULL) + { + if (CoordinatorAggregationStrategy == COORDINATOR_AGGREGATION_DISABLED) + { + RaiseDeferredError(aggregatePushdownError, ERROR); + } + else + { + extendedOpNodeProperties.pullUpIntermediateRows = true; + } + } } /* @@ -382,7 +404,6 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) * clause list to the worker operator node. We then push the worker operator * node below the collect node. */ - MultiExtendedOp *masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode, &extendedOpNodeProperties); MultiExtendedOp *workerExtendedOpNode = @@ -396,8 +417,23 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) MultiTable *tableNode = (MultiTable *) lfirst(tableNodeCell); if (tableNode->relationId == SUBQUERY_RELATION_ID) { - ErrorIfContainsUnsupportedAggregate((MultiNode *) tableNode); - TransformSubqueryNode(tableNode); + DeferredErrorMessage *error = + DeferErrorIfContainsNonPushdownableAggregate((MultiNode *) tableNode); + bool subqueryRequiresIntermediateRowPullUp = false; + + if (error != NULL) + { + if (CoordinatorAggregationStrategy == COORDINATOR_AGGREGATION_DISABLED) + { + RaiseDeferredError(error, ERROR); + } + else + { + subqueryRequiresIntermediateRowPullUp = true; + } + } + + TransformSubqueryNode(tableNode, subqueryRequiresIntermediateRowPullUp); } } @@ -1281,15 +1317,22 @@ ApplyExtendedOpNodes(MultiExtendedOp *originalNode, MultiExtendedOp *masterNode, * operator node. */ static void -TransformSubqueryNode(MultiTable *subqueryNode) +TransformSubqueryNode(MultiTable *subqueryNode, bool requiresIntermediateRowPullUp) { + if (CoordinatorAggregationStrategy != COORDINATOR_AGGREGATION_DISABLED && + RequiresIntermediateRowPullUp((MultiNode *) subqueryNode)) + { + requiresIntermediateRowPullUp = true; + } + MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) ChildNode((MultiUnaryNode *) subqueryNode); MultiNode *collectNode = ChildNode((MultiUnaryNode *) extendedOpNode); MultiNode *collectChildNode = ChildNode((MultiUnaryNode *) collectNode); ExtendedOpNodeProperties extendedOpNodeProperties = - BuildExtendedOpNodeProperties(extendedOpNode); + BuildExtendedOpNodeProperties(extendedOpNode, requiresIntermediateRowPullUp); + MultiExtendedOp *masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode, &extendedOpNodeProperties); MultiExtendedOp *workerExtendedOpNode = @@ -1369,8 +1412,8 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode, MasterAggregateWalkerContext *walkerContext = palloc0( sizeof(MasterAggregateWalkerContext)); + walkerContext->extendedOpNodeProperties = extendedOpNodeProperties; walkerContext->columnId = 1; - walkerContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns; /* iterate over original target entries */ foreach(targetEntryCell, targetEntryList) @@ -1509,16 +1552,62 @@ static Expr * MasterAggregateExpression(Aggref *originalAggregate, MasterAggregateWalkerContext *walkerContext) { - AggregateType aggregateType = GetAggregateType(originalAggregate); - Expr *newMasterExpression = NULL; const uint32 masterTableId = 1; /* one table on the master node */ const Index columnLevelsUp = 0; /* normal column */ const AttrNumber argumentId = 1; /* our aggregates have single arguments */ + AggregateType aggregateType = GetAggregateType(originalAggregate); + Expr *newMasterExpression = NULL; AggClauseCosts aggregateCosts; - if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct && - CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION && - walkerContext->pullDistinctColumns) + if (walkerContext->extendedOpNodeProperties->pullUpIntermediateRows) + { + Aggref *aggregate = (Aggref *) copyObject(originalAggregate); + + TargetEntry *targetEntry; + foreach_ptr(targetEntry, aggregate->args) + { + targetEntry->expr = (Expr *) + makeVar(masterTableId, walkerContext->columnId, + exprType((Node *) targetEntry->expr), + exprTypmod((Node *) targetEntry->expr), + exprCollation((Node *) targetEntry->expr), + columnLevelsUp); + walkerContext->columnId++; + } + + aggregate->aggdirectargs = NIL; + Expr *directarg; + foreach_ptr(directarg, originalAggregate->aggdirectargs) + { + if (!IsA(directarg, Const) && !IsA(directarg, Param)) + { + Var *var = makeVar(masterTableId, walkerContext->columnId, + exprType((Node *) directarg), + exprTypmod((Node *) directarg), + exprCollation((Node *) directarg), + columnLevelsUp); + aggregate->aggdirectargs = lappend(aggregate->aggdirectargs, var); + walkerContext->columnId++; + } + else + { + aggregate->aggdirectargs = lappend(aggregate->aggdirectargs, directarg); + } + } + + if (aggregate->aggfilter) + { + aggregate->aggfilter = (Expr *) + makeVar(masterTableId, walkerContext->columnId, + BOOLOID, -1, InvalidOid, columnLevelsUp); + walkerContext->columnId++; + } + + newMasterExpression = (Expr *) aggregate; + } + else if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct && + CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION && + walkerContext->extendedOpNodeProperties->pullDistinctColumns) { Aggref *aggregate = (Aggref *) copyObject(originalAggregate); List *varList = pull_var_clause_default((Node *) aggregate); @@ -1834,7 +1923,7 @@ MasterAggregateExpression(Aggref *originalAggregate, newMasterExpression = (Expr *) unionAggregate; } - else if (aggregateType == AGGREGATE_CUSTOM) + else if (aggregateType == AGGREGATE_CUSTOM_COMBINE) { HeapTuple aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(originalAggregate->aggfnoid)); @@ -2102,8 +2191,17 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, /* targetProjectionNumber starts from 1 */ queryTargetList.targetProjectionNumber = 1; - /* worker query always include all the group by entries in the query */ - queryGroupClause.groupClauseList = copyObject(originalGroupClauseList); + /* + * only push down grouping to worker query when pushing down aggregates + */ + if (extendedOpNodeProperties->pullUpIntermediateRows) + { + queryGroupClause.groupClauseList = NIL; + } + else + { + queryGroupClause.groupClauseList = copyObject(originalGroupClauseList); + } /* * nextSortGroupRefIndex is used by group by, window and order by clauses. @@ -2122,41 +2220,44 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, &queryHavingQual, &queryTargetList, &queryGroupClause); - ProcessDistinctClauseForWorkerQuery(originalDistinctClause, hasDistinctOn, - queryGroupClause.groupClauseList, - queryHasAggregates, &queryDistinctClause, - &distinctPreventsLimitPushdown); - ProcessWindowFunctionsForWorkerQuery(originalWindowClause, originalTargetEntryList, &queryWindowClause, &queryTargetList); - /* - * Order by and limit clauses are relevant to each other, and processing - * them together makes it handy for us. - * - * The other parts of the query might have already prohibited pushing down - * LIMIT and ORDER BY clauses as described below: - * (1) Creating a new group by clause during aggregate mutation, or - * (2) Distinct clause is not pushed down - */ - bool groupByExtended = - list_length(queryGroupClause.groupClauseList) > originalGroupClauseLength; - if (!groupByExtended && !distinctPreventsLimitPushdown) + if (!extendedOpNodeProperties->pullUpIntermediateRows) { - /* both sort and limit clauses rely on similar information */ - OrderByLimitReference limitOrderByReference = - BuildOrderByLimitReference(hasDistinctOn, - groupedByDisjointPartitionColumn, - originalGroupClauseList, - originalSortClauseList, - originalTargetEntryList); + ProcessDistinctClauseForWorkerQuery(originalDistinctClause, hasDistinctOn, + queryGroupClause.groupClauseList, + queryHasAggregates, &queryDistinctClause, + &distinctPreventsLimitPushdown); - ProcessLimitOrderByForWorkerQuery(limitOrderByReference, originalLimitCount, - originalLimitOffset, originalSortClauseList, - originalGroupClauseList, - originalTargetEntryList, - &queryOrderByLimit, - &queryTargetList); + /* + * Order by and limit clauses are relevant to each other, and processing + * them together makes it handy for us. + * + * The other parts of the query might have already prohibited pushing down + * LIMIT and ORDER BY clauses as described below: + * (1) Creating a new group by clause during aggregate mutation, or + * (2) Distinct clause is not pushed down + */ + bool groupByExtended = + list_length(queryGroupClause.groupClauseList) > originalGroupClauseLength; + if (!groupByExtended && !distinctPreventsLimitPushdown) + { + /* both sort and limit clauses rely on similar information */ + OrderByLimitReference limitOrderByReference = + BuildOrderByLimitReference(hasDistinctOn, + groupedByDisjointPartitionColumn, + originalGroupClauseList, + originalSortClauseList, + originalTargetEntryList); + + ProcessLimitOrderByForWorkerQuery(limitOrderByReference, originalLimitCount, + originalLimitOffset, originalSortClauseList, + originalGroupClauseList, + originalTargetEntryList, + &queryOrderByLimit, + &queryTargetList); + } } /* finally, fill the extended op node with the data we gathered */ @@ -2211,8 +2312,8 @@ ProcessTargetListForWorkerQuery(List *targetEntryList, WorkerAggregateWalkerContext *workerAggContext = palloc0(sizeof(WorkerAggregateWalkerContext)); + workerAggContext->extendedOpNodeProperties = extendedOpNodeProperties; workerAggContext->expressionList = NIL; - workerAggContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns; /* iterate over original target entries */ foreach(targetEntryCell, targetEntryList) @@ -2274,8 +2375,6 @@ ProcessHavingClauseForWorkerQuery(Node *originalHavingQual, QueryTargetList *queryTargetList, QueryGroupClause *queryGroupClause) { - TargetEntry *targetEntry = NULL; - if (originalHavingQual == NULL) { return; @@ -2291,13 +2390,14 @@ ProcessHavingClauseForWorkerQuery(Node *originalHavingQual, */ WorkerAggregateWalkerContext *workerAggContext = palloc0( sizeof(WorkerAggregateWalkerContext)); + + workerAggContext->extendedOpNodeProperties = extendedOpNodeProperties; workerAggContext->expressionList = NIL; - workerAggContext->pullDistinctColumns = - extendedOpNodeProperties->pullDistinctColumns; workerAggContext->createGroupByClause = false; WorkerAggregateWalker(originalHavingQual, workerAggContext); List *newExpressionList = workerAggContext->expressionList; + TargetEntry *targetEntry = NULL; ExpandWorkerTargetEntry(newExpressionList, targetEntry, workerAggContext->createGroupByClause, @@ -2348,7 +2448,7 @@ ProcessHavingClauseForWorkerQuery(Node *originalHavingQual, * and DISTINCT ON clauses to the worker queries. * * The function also sets distinctPreventsLimitPushdown. As the name reveals, - * distinct could prevent pushwing down LIMIT clauses later in the planning. + * distinct could prevent pushing down LIMIT clauses later in the planning. * For the details, see the comments in the function. * * inputs: distinctClause, hasDistinctOn, groupClauseList, queryHasAggregates @@ -2362,14 +2462,14 @@ ProcessDistinctClauseForWorkerQuery(List *distinctClause, bool hasDistinctOn, QueryDistinctClause *queryDistinctClause, bool *distinctPreventsLimitPushdown) { - bool distinctClauseSupersetofGroupClause = false; + *distinctPreventsLimitPushdown = false; if (distinctClause == NIL) { return; } - *distinctPreventsLimitPushdown = false; + bool distinctClauseSupersetofGroupClause = false; if (groupClauseList == NIL || IsGroupBySubsetOfDistinct(groupClauseList, distinctClause)) @@ -2412,13 +2512,13 @@ ProcessDistinctClauseForWorkerQuery(List *distinctClause, bool hasDistinctOn, * pushdown. * * TODO: Citus only supports pushing down window clauses as-is under certain circumstances. - * And, at this point in the planning, we are guaraanted to process a window function + * And, at this point in the planning, we are guaranteed to process a window function * which is safe to pushdown as-is. It should also be possible to pull the relevant data * to the coordinator and apply the window clauses for the remaining cases. * * Note that even though Citus only pushes down the window functions, it may need to * modify the target list of the worker query when the window function refers to - * an avg(). The reason is that any aggragate which is also referred by other + * an avg(). The reason is that any aggregate which is also referred by other * target entries would be mutated by Citus. Thus, we add a copy of the same aggragate * to the worker target list to make sure that the window function refers to the * non-mutated aggragate. @@ -2465,8 +2565,8 @@ ProcessWindowFunctionsForWorkerQuery(List *windowClauseList, * Note that even Citus does push down the window clauses as-is, we may still need to * add the generated entries to the target list. The reason is that the same aggragates * might be referred from another target entry that is a bare aggragate (e.g., no window - * functions), which would have been mutated. For instance, when an average aggragate - * is mutated on the target list, the window function would refer to a sum aggragate, + * functions), which would have been mutated. For instance, when an average aggregate + * is mutated on the target list, the window function would refer to a sum aggregate, * which is obviously wrong. */ queryTargetList->targetEntryList = list_concat(queryTargetList->targetEntryList, @@ -2614,7 +2714,7 @@ ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry, TargetEntry *newTargetEntry = GenerateWorkerTargetEntry(originalTargetEntry, newExpression, queryTargetList->targetProjectionNumber); - (queryTargetList->targetProjectionNumber)++; + queryTargetList->targetProjectionNumber++; queryTargetList->targetEntryList = lappend(queryTargetList->targetEntryList, newTargetEntry); @@ -2682,7 +2782,7 @@ GenerateWorkerTargetEntry(TargetEntry *targetEntry, Expr *workerExpression, */ if (targetEntry) { - newTargetEntry = copyObject(targetEntry); + newTargetEntry = flatCopyTargetEntry(targetEntry); } else { @@ -2699,7 +2799,7 @@ GenerateWorkerTargetEntry(TargetEntry *targetEntry, Expr *workerExpression, newTargetEntry->resname = columnNameString->data; } - /* we can generate a target entry without any expressions */ + /* we can't generate a target entry without an expression */ Assert(workerExpression != NULL); /* force resjunk to false as we may need this on the master */ @@ -2795,13 +2895,40 @@ static List * WorkerAggregateExpressionList(Aggref *originalAggregate, WorkerAggregateWalkerContext *walkerContext) { - AggregateType aggregateType = GetAggregateType(originalAggregate); List *workerAggregateList = NIL; + + if (walkerContext->extendedOpNodeProperties->pullUpIntermediateRows) + { + TargetEntry *targetEntry; + foreach_ptr(targetEntry, originalAggregate->args) + { + workerAggregateList = lappend(workerAggregateList, targetEntry->expr); + } + + Expr *directarg; + foreach_ptr(directarg, originalAggregate->aggdirectargs) + { + if (!IsA(directarg, Const) && !IsA(directarg, Param)) + { + workerAggregateList = lappend(workerAggregateList, directarg); + } + } + + if (originalAggregate->aggfilter) + { + workerAggregateList = lappend(workerAggregateList, + originalAggregate->aggfilter); + } + + return workerAggregateList; + } + + AggregateType aggregateType = GetAggregateType(originalAggregate); AggClauseCosts aggregateCosts; if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct && CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION && - walkerContext->pullDistinctColumns) + walkerContext->extendedOpNodeProperties->pullDistinctColumns) { Aggref *aggregate = (Aggref *) copyObject(originalAggregate); List *columnList = pull_var_clause_default((Node *) aggregate); @@ -2910,7 +3037,7 @@ WorkerAggregateExpressionList(Aggref *originalAggregate, workerAggregateList = lappend(workerAggregateList, sumAggregate); workerAggregateList = lappend(workerAggregateList, countAggregate); } - else if (aggregateType == AGGREGATE_CUSTOM) + else if (aggregateType == AGGREGATE_CUSTOM_COMBINE) { HeapTuple aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(originalAggregate->aggfnoid)); @@ -3019,10 +3146,17 @@ GetAggregateType(Aggref *aggregateExpression) if (AggregateEnabledCustom(aggregateExpression)) { - return AGGREGATE_CUSTOM; + return AGGREGATE_CUSTOM_COMBINE; } - ereport(ERROR, (errmsg("unsupported aggregate function %s", aggregateProcName))); + if (CoordinatorAggregationStrategy == COORDINATOR_AGGREGATION_DISABLED) + { + ereport(ERROR, (errmsg("unsupported aggregate function %s", aggregateProcName))); + } + else + { + return AGGREGATE_CUSTOM_ROW_GATHER; + } } @@ -3337,14 +3471,59 @@ MakeIntegerConstInt64(int64 integerValue) /* - * ErrorIfContainsUnsupportedAggregate extracts aggregate expressions from the - * logical plan, walks over them and uses helper functions to check if we can - * transform these aggregate expressions and push them down to worker nodes. - * These helper functions error out if we cannot transform the aggregates. + * RequiresIntermediateRowPullUp checks for if any aggregates cannot be pushed down. */ -static void -ErrorIfContainsUnsupportedAggregate(MultiNode *logicalPlanNode) +static bool +RequiresIntermediateRowPullUp(MultiNode *logicalPlanNode) { + if (CoordinatorAggregationStrategy == COORDINATOR_AGGREGATION_DISABLED) + { + return false; + } + + List *opNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp); + MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) linitial(opNodeList); + + List *targetList = extendedOpNode->targetList; + + /* + * PVC_REJECT_PLACEHOLDERS is implicit if PVC_INCLUDE_PLACEHOLDERS isn't + * specified. + */ + List *expressionList = pull_var_clause((Node *) targetList, PVC_INCLUDE_AGGREGATES | + PVC_INCLUDE_WINDOWFUNCS); + + Node *expression = NULL; + foreach_ptr(expression, expressionList) + { + /* only consider aggregate expressions */ + if (!IsA(expression, Aggref)) + { + continue; + } + + AggregateType aggregateType = GetAggregateType((Aggref *) expression); + Assert(aggregateType != AGGREGATE_INVALID_FIRST); + + if (aggregateType == AGGREGATE_CUSTOM_ROW_GATHER) + { + return true; + } + } + + return false; +} + + +/* + * DeferErrorIfContainsNonPushdownableAggregate extracts aggregate expressions from + * the logical plan, walks over them and uses helper functions to check if we + * can transform these aggregate expressions and push them down to worker nodes. + */ +static DeferredErrorMessage * +DeferErrorIfContainsNonPushdownableAggregate(MultiNode *logicalPlanNode) +{ + DeferredErrorMessage *error = NULL; List *opNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp); MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) linitial(opNodeList); @@ -3380,92 +3559,119 @@ ErrorIfContainsUnsupportedAggregate(MultiNode *logicalPlanNode) */ if (aggregateType == AGGREGATE_ARRAY_AGG) { - ErrorIfUnsupportedArrayAggregate(aggregateExpression); + error = DeferErrorIfUnsupportedArrayAggregate(aggregateExpression); } else if (aggregateType == AGGREGATE_JSONB_AGG || aggregateType == AGGREGATE_JSON_AGG) { - ErrorIfUnsupportedJsonAggregate(aggregateType, aggregateExpression); + error = DeferErrorIfUnsupportedJsonAggregate(aggregateType, + aggregateExpression); } else if (aggregateType == AGGREGATE_JSONB_OBJECT_AGG || aggregateType == AGGREGATE_JSON_OBJECT_AGG) { - ErrorIfUnsupportedJsonAggregate(aggregateType, aggregateExpression); + error = DeferErrorIfUnsupportedJsonAggregate(aggregateType, + aggregateExpression); } else if (aggregateExpression->aggdistinct) { - ErrorIfUnsupportedAggregateDistinct(aggregateExpression, logicalPlanNode); + error = DeferErrorIfUnsupportedAggregateDistinct(aggregateExpression, + logicalPlanNode); + } + + if (error != NULL) + { + return error; } } + + return NULL; } /* - * ErrorIfUnsupportedArrayAggregate checks if we can transform the array aggregate + * DeferErrorIfUnsupportedArrayAggregate checks if we can transform the array aggregate * expression and push it down to the worker node. If we cannot transform the * aggregate, this function errors. */ -static void -ErrorIfUnsupportedArrayAggregate(Aggref *arrayAggregateExpression) +static DeferredErrorMessage * +DeferErrorIfUnsupportedArrayAggregate(Aggref *arrayAggregateExpression) { /* if array_agg has order by, we error out */ if (arrayAggregateExpression->aggorder) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("array_agg with order by is unsupported"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "array_agg with order by is unsupported", + NULL, NULL); } /* if array_agg has distinct, we error out */ if (arrayAggregateExpression->aggdistinct) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("array_agg (distinct) is unsupported"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "array_agg (distinct) is unsupported", + NULL, NULL); } + + return NULL; } /* - * ErrorIfUnsupportedJsonAggregate checks if we can transform the json + * DeferErrorIfUnsupportedJsonAggregate checks if we can transform the json * aggregate expression and push it down to the worker node. If we cannot * transform the aggregate, this function errors. */ -static void -ErrorIfUnsupportedJsonAggregate(AggregateType type, - Aggref *aggregateExpression) +static DeferredErrorMessage * +DeferErrorIfUnsupportedJsonAggregate(AggregateType type, + Aggref *aggregateExpression) { /* if json aggregate has order by, we error out */ - if (aggregateExpression->aggorder) + if (aggregateExpression->aggdistinct || aggregateExpression->aggorder) { + StringInfoData errorDetail; + initStringInfo(&errorDetail); const char *name = AggregateNames[type]; - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("%s with order by is unsupported", name))); + + appendStringInfoString(&errorDetail, name); + if (aggregateExpression->aggorder) + { + appendStringInfoString(&errorDetail, " with order by is unsupported"); + } + else + { + appendStringInfoString(&errorDetail, " (distinct) is unsupported"); + } + + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorDetail.data, + NULL, NULL); } - /* if json aggregate has distinct, we error out */ - if (aggregateExpression->aggdistinct) - { - const char *name = AggregateNames[type]; - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("%s (distinct) is unsupported", name))); - } + return NULL; } /* - * ErrorIfUnsupportedAggregateDistinct checks if we can transform the aggregate + * DeferErrorIfUnsupportedAggregateDistinct checks if we can transform the aggregate * (distinct expression) and push it down to the worker node. It handles count * (distinct) separately to check if we can use distinct approximations. If we * cannot transform the aggregate, this function errors. */ -static void -ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, - MultiNode *logicalPlanNode) +static DeferredErrorMessage * +DeferErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, + MultiNode *logicalPlanNode) { - char *errorDetail = NULL; + const char *errorDetail = NULL; bool distinctSupported = true; AggregateType aggregateType = GetAggregateType(aggregateExpression); + /* If we're aggregating on coordinator, this becomes simple. */ + if (aggregateType == AGGREGATE_CUSTOM_ROW_GATHER) + { + return NULL; + } + /* * We partially support count(distinct) in subqueries, other distinct aggregates in * subqueries are not supported yet. @@ -3480,9 +3686,10 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, Var *column = (Var *) lfirst(columnCell); if (column->varattno <= 0) { - ereport(ERROR, (errmsg("cannot compute count (distinct)"), - errdetail("Non-column references are not supported " - "yet"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot compute count (distinct)", + "Non-column references are not supported yet", + NULL); } } } @@ -3496,9 +3703,10 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, if (multiTable->relationId == SUBQUERY_RELATION_ID || multiTable->relationId == SUBQUERY_PUSHDOWN_RELATION_ID) { - ereport(ERROR, (errmsg("cannot compute aggregate (distinct)"), - errdetail("Only count(distinct) aggregate is " - "supported in subqueries"))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot compute aggregate (distinct)", + "Only count(distinct) aggregate is " + "supported in subqueries", NULL); } } } @@ -3513,12 +3721,14 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, /* if extension for distinct approximation is loaded, we are good */ if (distinctExtensionId != InvalidOid) { - return; + return NULL; } else { - ereport(ERROR, (errmsg("cannot compute count (distinct) approximation"), - errhint("You need to have the hll extension loaded."))); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot compute count (distinct) approximation", + NULL, + "You need to have the hll extension loaded."); } } @@ -3580,21 +3790,19 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, /* if current aggregate expression isn't supported, error out */ if (!distinctSupported) { + const char *errorHint = NULL; if (aggregateType == AGGREGATE_COUNT) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot compute aggregate (distinct)"), - errdetail("%s", errorDetail), - errhint("You can load the hll extension from contrib " - "packages and enable distinct approximations."))); - } - else - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot compute aggregate (distinct)"), - errdetail("%s", errorDetail))); + errorHint = "You can load the hll extension from contrib " + "packages and enable distinct approximations."; } + + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "cannot compute aggregate (distinct)", + errorDetail, errorHint); } + + return NULL; } diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index 74874648c..377b0400d 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -50,7 +50,7 @@ static List * MasterTargetList(List *workerTargetList); static PlannedStmt * BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *remoteScan); static Agg * BuildAggregatePlan(PlannerInfo *root, Query *masterQuery, Plan *subPlan); -static bool HasDistinctAggregate(Query *masterQuery); +static bool HasDistinctOrOrderByAggregate(Query *masterQuery); static bool UseGroupAggregateWithHLL(Query *masterQuery); static bool QueryContainsAggregateWithHLL(Query *query); static Plan * BuildDistinctPlan(Query *masterQuery, Plan *subPlan); @@ -361,7 +361,7 @@ FinalizeStatement(PlannerInfo *root, PlannedStmt *result, Plan *top_plan) /* * BuildAggregatePlan creates and returns an aggregate plan. This aggregate plan - * builds aggreation and grouping operators (if any) that are to be executed on + * builds aggregation and grouping operators (if any) that are to be executed on * the master node. */ static Agg * @@ -405,7 +405,7 @@ BuildAggregatePlan(PlannerInfo *root, Query *masterQuery, Plan *subPlan) { bool groupingIsHashable = grouping_is_hashable(groupColumnList); bool groupingIsSortable = grouping_is_sortable(groupColumnList); - bool hasDistinctAggregate = HasDistinctAggregate(masterQuery); + bool hasUnhashableAggregate = HasDistinctOrOrderByAggregate(masterQuery); if (!groupingIsHashable && !groupingIsSortable) { @@ -421,7 +421,7 @@ BuildAggregatePlan(PlannerInfo *root, Query *masterQuery, Plan *subPlan) * If the master query contains hll aggregate functions and the client set * hll.force_groupagg to on, then we choose to use group aggregation. */ - if (!enable_hashagg || !groupingIsHashable || hasDistinctAggregate || + if (!enable_hashagg || !groupingIsHashable || hasUnhashableAggregate || UseGroupAggregateWithHLL(masterQuery)) { char *messageHint = NULL; @@ -465,7 +465,7 @@ BuildAggregatePlan(PlannerInfo *root, Query *masterQuery, Plan *subPlan) * aggregate in its target list or in having clause. */ static bool -HasDistinctAggregate(Query *masterQuery) +HasDistinctOrOrderByAggregate(Query *masterQuery) { ListCell *allColumnCell = NULL; @@ -481,7 +481,7 @@ HasDistinctAggregate(Query *masterQuery) if (IsA(columnNode, Aggref)) { Aggref *aggref = (Aggref *) columnNode; - if (aggref->aggdistinct != NIL) + if (aggref->aggdistinct != NIL || aggref->aggorder != NIL) { return true; } @@ -595,9 +595,9 @@ BuildDistinctPlan(Query *masterQuery, Plan *subPlan) * Otherwise create sort+unique plan. */ bool distinctClausesHashable = grouping_is_hashable(distinctClauseList); - bool hasDistinctAggregate = HasDistinctAggregate(masterQuery); + bool hasUnhashableAggregate = HasDistinctOrOrderByAggregate(masterQuery); - if (enable_hashagg && distinctClausesHashable && !hasDistinctAggregate) + if (enable_hashagg && distinctClausesHashable && !hasUnhashableAggregate) { distinctPlan = (Plan *) makeAggNode(distinctClauseList, NIL, AGG_HASHED, targetList, subPlan); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 1919e59e8..ebdf44198 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -140,6 +140,12 @@ static const struct config_enum_entry use_secondary_nodes_options[] = { { NULL, 0, false } }; +static const struct config_enum_entry coordinator_aggregation_options[] = { + { "disabled", COORDINATOR_AGGREGATION_DISABLED, false }, + { "row-gather", COORDINATOR_AGGREGATION_ROW_GATHER, false }, + { NULL, 0, false } +}; + static const struct config_enum_entry shard_commit_protocol_options[] = { { "1pc", COMMIT_PROTOCOL_1PC, false }, { "2pc", COMMIT_PROTOCOL_2PC, false }, @@ -1006,6 +1012,19 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomEnumVariable( + "citus.coordinator_aggregation_strategy", + gettext_noop("Sets the strategy for when an aggregate cannot be pushed down. " + "'row-gather' will pull up intermediate rows to the coordinator, " + "while 'disabled' will error if coordinator aggregation is necessary"), + NULL, + &CoordinatorAggregationStrategy, + COORDINATOR_AGGREGATION_ROW_GATHER, + coordinator_aggregation_options, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomEnumVariable( "citus.multi_shard_commit_protocol", gettext_noop("Sets the commit protocol for commands modifying multiple shards."), diff --git a/src/include/distributed/extended_op_node_utils.h b/src/include/distributed/extended_op_node_utils.h index e8dcc3be3..72615f615 100644 --- a/src/include/distributed/extended_op_node_utils.h +++ b/src/include/distributed/extended_op_node_utils.h @@ -28,11 +28,12 @@ typedef struct ExtendedOpNodeProperties bool hasNonPartitionColumnDistinctAgg; bool pullDistinctColumns; bool pushDownWindowFunctions; + bool pullUpIntermediateRows; } ExtendedOpNodeProperties; -extern ExtendedOpNodeProperties BuildExtendedOpNodeProperties(MultiExtendedOp * - extendedOpNode); +extern ExtendedOpNodeProperties BuildExtendedOpNodeProperties( + MultiExtendedOp *extendedOpNode, bool pullUpIntermediateRows); #endif /* EXTENDED_OP_NODE_UTILS_H_ */ diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index 708d4569f..f9997c4ce 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -81,9 +81,19 @@ typedef enum AGGREGATE_ANY_VALUE = 20, /* AGGREGATE_CUSTOM must come last */ - AGGREGATE_CUSTOM = 21 + AGGREGATE_CUSTOM_COMBINE = 21, + AGGREGATE_CUSTOM_ROW_GATHER = 22, } AggregateType; + +/* Enumeration for citus.coordinator_aggregation GUC */ +typedef enum +{ + COORDINATOR_AGGREGATION_DISABLED, + COORDINATOR_AGGREGATION_ROW_GATHER, +} CoordinatorAggregationStrategyType; + + /* * PushDownStatus indicates whether a node can be pushed down below its child * using the commutative and distributive relational algebraic properties. @@ -135,6 +145,7 @@ static const char *const AggregateNames[] = { /* Config variable managed via guc.c */ extern int LimitClauseRowFetchCount; extern double CountDistinctErrorRate; +extern int CoordinatorAggregationStrategy; /* Function declaration for optimizing logical plans */ diff --git a/src/test/regress/expected/aggregate_support.out b/src/test/regress/expected/aggregate_support.out index 5b86da4b3..1bfb788db 100644 --- a/src/test/regress/expected/aggregate_support.out +++ b/src/test/regress/expected/aggregate_support.out @@ -4,6 +4,7 @@ -- Tests support for user defined aggregates create schema aggregate_support; set search_path to aggregate_support; +set citus.coordinator_aggregation_strategy to 'disabled'; -- We test with & without STRICT as our code is responsible for managing these NULL checks create function sum2_sfunc_strict(state int, x int) returns int immutable strict language plpgsql as $$ @@ -113,6 +114,19 @@ select sum2(val), sum2_strict(val) from aggdata where valf = 0; 0 | (1 row) +-- Test HAVING +select key, stddev(valf) from aggdata group by key having stddev(valf) > 2 order by key; + key | stddev +--------------------------------------------------------------------- + 1 | 6.43467170879758 +(1 row) + +select key, stddev(valf) from aggdata group by key having stddev(val::float8) > 1 order by key; + key | stddev +--------------------------------------------------------------------- + 2 | 1.01500410508201 +(1 row) + -- test polymorphic aggregates from https://github.com/citusdata/citus/issues/2397 -- we do not currently support pseudotypes for transition types, so this errors for now CREATE OR REPLACE FUNCTION first_agg(anyelement, anyelement) @@ -255,5 +269,139 @@ select array_collect_sort(val) from aggdata; (1 row) reset role; +-- Test aggregation on coordinator +set citus.coordinator_aggregation_strategy to 'row-gather'; +select key, first(val order by id), last(val order by id) +from aggdata group by key order by key; + key | first | last +--------------------------------------------------------------------- + 1 | 2 | + 2 | 2 | 5 + 3 | 4 | 4 + 5 | | + 6 | | + 7 | 8 | 8 + 9 | 0 | 0 +(7 rows) + +select key, sum2(distinct val), sum2_strict(distinct val) from aggdata group by key order by key; + key | sum2 | sum2_strict +--------------------------------------------------------------------- + 1 | | 4 + 2 | 20 | 20 + 3 | 8 | 8 + 5 | | + 6 | | + 7 | 16 | 16 + 9 | 0 | 0 +(7 rows) + +select key, sum2(val order by valf), sum2_strict(val order by valf) from aggdata group by key order by key; + key | sum2 | sum2_strict +--------------------------------------------------------------------- + 1 | | 4 + 2 | 20 | 20 + 3 | 8 | 8 + 5 | | + 6 | | + 7 | 16 | 16 + 9 | 0 | 0 +(7 rows) + +select string_agg(distinct floor(val/2)::text, '|' order by floor(val/2)::text) from aggdata; + string_agg +--------------------------------------------------------------------- + 0|1|2|4 +(1 row) + +select string_agg(distinct floor(val/2)::text, '|' order by floor(val/2)::text) filter (where val < 5) from aggdata; + string_agg +--------------------------------------------------------------------- + 0|1|2 +(1 row) + +select mode() within group (order by floor(val/2)) from aggdata; + mode +--------------------------------------------------------------------- + 1 +(1 row) + +select percentile_cont(0.5) within group(order by valf) from aggdata; + percentile_cont +--------------------------------------------------------------------- + 8.225 +(1 row) + +select key, percentile_cont(key/10.0) within group(order by val) from aggdata group by key; + key | percentile_cont +--------------------------------------------------------------------- + 1 | 2 + 2 | 2.4 + 3 | 4 + 5 | + 6 | + 7 | 8 + 9 | 0 +(7 rows) + +select floor(val/2), corr(valf, valf + val) from aggdata group by floor(val/2) order by 1; + floor | corr +--------------------------------------------------------------------- + 0 | + 1 | 0.991808518376741 + 2 | 1 + 4 | + | +(5 rows) + +select floor(val/2), corr(valf, valf + val) from aggdata group by floor(val/2) having corr(valf + val, val) < 1 order by 1; + floor | corr +--------------------------------------------------------------------- + 1 | 0.991808518376741 + 2 | 1 +(2 rows) + +select array_agg(val order by valf) from aggdata; + array_agg +--------------------------------------------------------------------- + {0,NULL,2,3,5,2,4,NULL,NULL,8,NULL} +(1 row) + +-- Test TransformSubqueryNode +SET citus.task_executor_type to "task-tracker"; +select * FROM ( + SELECT key, mode() within group (order by floor(agg1.val/2)) m from aggdata agg1 + group by key +) subq ORDER BY 2, 1 LIMIT 5; + key | m +--------------------------------------------------------------------- + 9 | 0 + 1 | 1 + 2 | 1 + 3 | 2 + 7 | 4 +(5 rows) + +select * FROM ( + SELECT key, avg(distinct floor(agg1.val/2)) m from aggdata agg1 + group by key +) subq; + key | m +--------------------------------------------------------------------- + 1 | 1 + 5 | + 3 | 2 + 7 | 4 + 6 | + 2 | 1.5 + 9 | 0 +(7 rows) + +RESET citus.task_executor_type; +-- This fails due to table types not being managed properly +select key, count(distinct aggdata) +from aggdata group by key order by 1, 2; +ERROR: type "aggregate_support.aggdata" does not exist +CONTEXT: while executing command on localhost:xxxxx set client_min_messages to error; drop schema aggregate_support cascade; diff --git a/src/test/regress/expected/custom_aggregate_support.out b/src/test/regress/expected/custom_aggregate_support.out index 606e1e2ab..f3816363d 100644 --- a/src/test/regress/expected/custom_aggregate_support.out +++ b/src/test/regress/expected/custom_aggregate_support.out @@ -11,6 +11,7 @@ WHERE name = 'hll' :create_cmd; ERROR: extension "hll" already exists SET citus.shard_count TO 4; +set citus.coordinator_aggregation_strategy to 'disabled'; CREATE TABLE raw_table (day date, user_id int); CREATE TABLE daily_uniques(day date, unique_users hll); SELECT create_distributed_table('raw_table', 'user_id'); diff --git a/src/test/regress/expected/custom_aggregate_support_0.out b/src/test/regress/expected/custom_aggregate_support_0.out index c2e322e52..2e7c62bf4 100644 --- a/src/test/regress/expected/custom_aggregate_support_0.out +++ b/src/test/regress/expected/custom_aggregate_support_0.out @@ -15,6 +15,7 @@ WHERE name = 'hll' (1 row) SET citus.shard_count TO 4; +set citus.coordinator_aggregation_strategy to 'disabled'; CREATE TABLE raw_table (day date, user_id int); CREATE TABLE daily_uniques(day date, unique_users hll); ERROR: type "hll" does not exist diff --git a/src/test/regress/expected/custom_aggregate_support_1.out b/src/test/regress/expected/custom_aggregate_support_1.out index 8709a716b..21dedfaaa 100644 --- a/src/test/regress/expected/custom_aggregate_support_1.out +++ b/src/test/regress/expected/custom_aggregate_support_1.out @@ -11,6 +11,7 @@ WHERE name = 'hll' :create_cmd; ERROR: extension "hll" already exists SET citus.shard_count TO 4; +set citus.coordinator_aggregation_strategy to 'disabled'; CREATE TABLE raw_table (day date, user_id int); CREATE TABLE daily_uniques(day date, unique_users hll); SELECT create_distributed_table('raw_table', 'user_id'); diff --git a/src/test/regress/expected/multi_agg_approximate_distinct.out b/src/test/regress/expected/multi_agg_approximate_distinct.out index 2831d7a7a..aaf4e0543 100644 --- a/src/test/regress/expected/multi_agg_approximate_distinct.out +++ b/src/test/regress/expected/multi_agg_approximate_distinct.out @@ -9,6 +9,7 @@ AS create_cmd FROM pg_available_extensions() WHERE name = 'hll' \gset :create_cmd; +SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Try to execute count(distinct) when approximate distincts aren't enabled SELECT count(distinct l_orderkey) FROM lineitem; count diff --git a/src/test/regress/expected/multi_agg_approximate_distinct_0.out b/src/test/regress/expected/multi_agg_approximate_distinct_0.out index 43fb5dfc0..3e55c0720 100644 --- a/src/test/regress/expected/multi_agg_approximate_distinct_0.out +++ b/src/test/regress/expected/multi_agg_approximate_distinct_0.out @@ -14,6 +14,7 @@ WHERE name = 'hll' f (1 row) +SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Try to execute count(distinct) when approximate distincts aren't enabled SELECT count(distinct l_orderkey) FROM lineitem; count diff --git a/src/test/regress/expected/multi_array_agg.out b/src/test/regress/expected/multi_array_agg.out index dd673e11d..d073ce6fe 100644 --- a/src/test/regress/expected/multi_array_agg.out +++ b/src/test/regress/expected/multi_array_agg.out @@ -2,6 +2,7 @@ -- MULTI_ARRAY_AGG -- SET citus.next_shard_id TO 520000; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE OR REPLACE FUNCTION array_sort (ANYARRAY) RETURNS ANYARRAY LANGUAGE SQL AS $$ diff --git a/src/test/regress/expected/multi_json_agg.out b/src/test/regress/expected/multi_json_agg.out index 16503104c..c6d8ceb60 100644 --- a/src/test/regress/expected/multi_json_agg.out +++ b/src/test/regress/expected/multi_json_agg.out @@ -2,6 +2,7 @@ -- MULTI_JSON_AGG -- SET citus.next_shard_id TO 520000; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE OR REPLACE FUNCTION array_sort (json) RETURNS json LANGUAGE SQL AS $$ diff --git a/src/test/regress/expected/multi_json_object_agg.out b/src/test/regress/expected/multi_json_object_agg.out index adb4d7ee1..ab0bcccc7 100644 --- a/src/test/regress/expected/multi_json_object_agg.out +++ b/src/test/regress/expected/multi_json_object_agg.out @@ -2,6 +2,7 @@ -- MULTI_JSON_OBJECT_AGG -- SET citus.next_shard_id TO 520000; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE OR REPLACE FUNCTION count_keys (json) RETURNS bigint LANGUAGE SQL AS $$ diff --git a/src/test/regress/expected/multi_jsonb_agg.out b/src/test/regress/expected/multi_jsonb_agg.out index 79d787d86..48c17214e 100644 --- a/src/test/regress/expected/multi_jsonb_agg.out +++ b/src/test/regress/expected/multi_jsonb_agg.out @@ -2,6 +2,7 @@ -- MULTI_JSONB_AGG -- SET citus.next_shard_id TO 520000; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE OR REPLACE FUNCTION array_sort (jsonb) RETURNS jsonb LANGUAGE SQL AS $$ diff --git a/src/test/regress/expected/multi_jsonb_object_agg.out b/src/test/regress/expected/multi_jsonb_object_agg.out index 0a85c7dbd..b3e5526da 100644 --- a/src/test/regress/expected/multi_jsonb_object_agg.out +++ b/src/test/regress/expected/multi_jsonb_object_agg.out @@ -2,6 +2,7 @@ -- MULTI_JSONB_OBJECT_AGG -- SET citus.next_shard_id TO 520000; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE OR REPLACE FUNCTION count_keys (jsonb) RETURNS bigint LANGUAGE SQL AS $$ diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index fb999d87c..7270f0af3 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -3,6 +3,7 @@ SET citus.next_shard_id TO 850000; -- router planner, so we're explicitly disabling it in this file. -- We've bunch of other tests that triggers fast-path-router SET citus.enable_fast_path_router_planner TO false; +SET citus.coordinator_aggregation_strategy TO 'disabled'; -- =================================================================== -- test end-to-end query functionality -- =================================================================== diff --git a/src/test/regress/expected/multi_simple_queries_0.out b/src/test/regress/expected/multi_simple_queries_0.out index 134074c8d..688724418 100644 --- a/src/test/regress/expected/multi_simple_queries_0.out +++ b/src/test/regress/expected/multi_simple_queries_0.out @@ -3,6 +3,7 @@ SET citus.next_shard_id TO 850000; -- router planner, so we're explicitly disabling it in this file. -- We've bunch of other tests that triggers fast-path-router SET citus.enable_fast_path_router_planner TO false; +SET citus.coordinator_aggregation_strategy TO 'disabled'; -- =================================================================== -- test end-to-end query functionality -- =================================================================== diff --git a/src/test/regress/expected/multi_subquery.out b/src/test/regress/expected/multi_subquery.out index c80d1597f..76f336320 100644 --- a/src/test/regress/expected/multi_subquery.out +++ b/src/test/regress/expected/multi_subquery.out @@ -3,6 +3,7 @@ -- -- no need to set shardid sequence given that we're not creating any shards SET citus.next_shard_id TO 570032; +SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Check that we error out if shard min/max values are not exactly same. SELECT avg(unit_price) diff --git a/src/test/regress/expected/set_operations.out b/src/test/regress/expected/set_operations.out index a29f711d1..7ef4deeb0 100644 --- a/src/test/regress/expected/set_operations.out +++ b/src/test/regress/expected/set_operations.out @@ -1,5 +1,6 @@ CREATE SCHEMA recursive_union; SET search_path TO recursive_union, public; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE TABLE recursive_union.test (x int, y int); SELECT create_distributed_table('test', 'x'); create_distributed_table diff --git a/src/test/regress/expected/subqueries_not_supported.out b/src/test/regress/expected/subqueries_not_supported.out index 734d801fa..2903ce2aa 100644 --- a/src/test/regress/expected/subqueries_not_supported.out +++ b/src/test/regress/expected/subqueries_not_supported.out @@ -3,6 +3,7 @@ -- =================================================================== CREATE SCHEMA not_supported; SET search_path TO not_supported, public; +SET citus.coordinator_aggregation_strategy TO 'disabled'; SET client_min_messages TO DEBUG1; CREATE TABLE users_table_local AS SELECT * FROM users_table; -- we don't support subqueries with local tables when they are not leaf queries diff --git a/src/test/regress/expected/with_basics.out b/src/test/regress/expected/with_basics.out index 8a6df96a5..9b553fd80 100644 --- a/src/test/regress/expected/with_basics.out +++ b/src/test/regress/expected/with_basics.out @@ -1,5 +1,6 @@ -- Test the basic CTE functionality and expected error messages SET search_path TO 'with_basics'; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE TYPE with_basics.xy AS (x int, y int); -- CTEs in FROM should work WITH cte AS ( diff --git a/src/test/regress/input/multi_agg_distinct.source b/src/test/regress/input/multi_agg_distinct.source index 6bd652023..4ec2efd3f 100644 --- a/src/test/regress/input/multi_agg_distinct.source +++ b/src/test/regress/input/multi_agg_distinct.source @@ -2,6 +2,7 @@ -- MULTI_AGG_DISTINCT -- +SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Create a new range partitioned lineitem table and load data into it CREATE TABLE lineitem_range ( diff --git a/src/test/regress/input/multi_complex_count_distinct.source b/src/test/regress/input/multi_complex_count_distinct.source index e79a1d773..221053ab0 100644 --- a/src/test/regress/input/multi_complex_count_distinct.source +++ b/src/test/regress/input/multi_complex_count_distinct.source @@ -6,6 +6,7 @@ SET citus.next_shard_id TO 240000; SET citus.shard_count TO 8; SET citus.shard_replication_factor TO 1; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE TABLE lineitem_hash ( l_orderkey bigint not null, diff --git a/src/test/regress/output/multi_agg_distinct.source b/src/test/regress/output/multi_agg_distinct.source index 59de22ed4..a3e21b559 100644 --- a/src/test/regress/output/multi_agg_distinct.source +++ b/src/test/regress/output/multi_agg_distinct.source @@ -1,6 +1,7 @@ -- -- MULTI_AGG_DISTINCT -- +SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Create a new range partitioned lineitem table and load data into it CREATE TABLE lineitem_range ( l_orderkey bigint not null, diff --git a/src/test/regress/output/multi_complex_count_distinct.source b/src/test/regress/output/multi_complex_count_distinct.source index c1907f659..a2be31e78 100644 --- a/src/test/regress/output/multi_complex_count_distinct.source +++ b/src/test/regress/output/multi_complex_count_distinct.source @@ -4,6 +4,7 @@ SET citus.next_shard_id TO 240000; SET citus.shard_count TO 8; SET citus.shard_replication_factor TO 1; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE TABLE lineitem_hash ( l_orderkey bigint not null, l_partkey integer not null, diff --git a/src/test/regress/sql/aggregate_support.sql b/src/test/regress/sql/aggregate_support.sql index 7641c3934..ba7dcf5b0 100644 --- a/src/test/regress/sql/aggregate_support.sql +++ b/src/test/regress/sql/aggregate_support.sql @@ -5,6 +5,7 @@ create schema aggregate_support; set search_path to aggregate_support; +set citus.coordinator_aggregation_strategy to 'disabled'; -- We test with & without STRICT as our code is responsible for managing these NULL checks create function sum2_sfunc_strict(state int, x int) @@ -63,6 +64,9 @@ select id, sum2(distinct val), sum2_strict(distinct val) from aggdata group by i select key, sum2(val order by valf), sum2_strict(val order by valf) from aggdata group by key order by key; -- Test handling a lack of intermediate results select sum2(val), sum2_strict(val) from aggdata where valf = 0; +-- Test HAVING +select key, stddev(valf) from aggdata group by key having stddev(valf) > 2 order by key; +select key, stddev(valf) from aggdata group by key having stddev(val::float8) > 1 order by key; -- test polymorphic aggregates from https://github.com/citusdata/citus/issues/2397 @@ -154,5 +158,41 @@ set role notsuper; select array_collect_sort(val) from aggdata; reset role; +-- Test aggregation on coordinator +set citus.coordinator_aggregation_strategy to 'row-gather'; + +select key, first(val order by id), last(val order by id) +from aggdata group by key order by key; + +select key, sum2(distinct val), sum2_strict(distinct val) from aggdata group by key order by key; +select key, sum2(val order by valf), sum2_strict(val order by valf) from aggdata group by key order by key; +select string_agg(distinct floor(val/2)::text, '|' order by floor(val/2)::text) from aggdata; +select string_agg(distinct floor(val/2)::text, '|' order by floor(val/2)::text) filter (where val < 5) from aggdata; +select mode() within group (order by floor(val/2)) from aggdata; +select percentile_cont(0.5) within group(order by valf) from aggdata; +select key, percentile_cont(key/10.0) within group(order by val) from aggdata group by key; +select floor(val/2), corr(valf, valf + val) from aggdata group by floor(val/2) order by 1; +select floor(val/2), corr(valf, valf + val) from aggdata group by floor(val/2) having corr(valf + val, val) < 1 order by 1; +select array_agg(val order by valf) from aggdata; + +-- Test TransformSubqueryNode +SET citus.task_executor_type to "task-tracker"; + +select * FROM ( + SELECT key, mode() within group (order by floor(agg1.val/2)) m from aggdata agg1 + group by key +) subq ORDER BY 2, 1 LIMIT 5; + +select * FROM ( + SELECT key, avg(distinct floor(agg1.val/2)) m from aggdata agg1 + group by key +) subq; + +RESET citus.task_executor_type; + +-- This fails due to table types not being managed properly +select key, count(distinct aggdata) +from aggdata group by key order by 1, 2; + set client_min_messages to error; drop schema aggregate_support cascade; diff --git a/src/test/regress/sql/custom_aggregate_support.sql b/src/test/regress/sql/custom_aggregate_support.sql index 2ce6e880d..f601fb8b8 100644 --- a/src/test/regress/sql/custom_aggregate_support.sql +++ b/src/test/regress/sql/custom_aggregate_support.sql @@ -12,6 +12,7 @@ WHERE name = 'hll' :create_cmd; SET citus.shard_count TO 4; +set citus.coordinator_aggregation_strategy to 'disabled'; CREATE TABLE raw_table (day date, user_id int); CREATE TABLE daily_uniques(day date, unique_users hll); diff --git a/src/test/regress/sql/multi_agg_approximate_distinct.sql b/src/test/regress/sql/multi_agg_approximate_distinct.sql index d3d527077..d5a535bc1 100644 --- a/src/test/regress/sql/multi_agg_approximate_distinct.sql +++ b/src/test/regress/sql/multi_agg_approximate_distinct.sql @@ -13,6 +13,8 @@ WHERE name = 'hll' :create_cmd; +SET citus.coordinator_aggregation_strategy TO 'disabled'; + -- Try to execute count(distinct) when approximate distincts aren't enabled SELECT count(distinct l_orderkey) FROM lineitem; diff --git a/src/test/regress/sql/multi_array_agg.sql b/src/test/regress/sql/multi_array_agg.sql index 220076480..bd275629a 100644 --- a/src/test/regress/sql/multi_array_agg.sql +++ b/src/test/regress/sql/multi_array_agg.sql @@ -4,6 +4,7 @@ SET citus.next_shard_id TO 520000; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE OR REPLACE FUNCTION array_sort (ANYARRAY) RETURNS ANYARRAY LANGUAGE SQL diff --git a/src/test/regress/sql/multi_json_agg.sql b/src/test/regress/sql/multi_json_agg.sql index f107b670d..28e61497a 100644 --- a/src/test/regress/sql/multi_json_agg.sql +++ b/src/test/regress/sql/multi_json_agg.sql @@ -4,6 +4,7 @@ SET citus.next_shard_id TO 520000; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE OR REPLACE FUNCTION array_sort (json) RETURNS json LANGUAGE SQL diff --git a/src/test/regress/sql/multi_json_object_agg.sql b/src/test/regress/sql/multi_json_object_agg.sql index 5495fe7af..87532c880 100644 --- a/src/test/regress/sql/multi_json_object_agg.sql +++ b/src/test/regress/sql/multi_json_object_agg.sql @@ -4,6 +4,7 @@ SET citus.next_shard_id TO 520000; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE OR REPLACE FUNCTION count_keys (json) RETURNS bigint LANGUAGE SQL diff --git a/src/test/regress/sql/multi_jsonb_agg.sql b/src/test/regress/sql/multi_jsonb_agg.sql index 51274ed22..8fb0e5bef 100644 --- a/src/test/regress/sql/multi_jsonb_agg.sql +++ b/src/test/regress/sql/multi_jsonb_agg.sql @@ -4,6 +4,7 @@ SET citus.next_shard_id TO 520000; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE OR REPLACE FUNCTION array_sort (jsonb) RETURNS jsonb LANGUAGE SQL diff --git a/src/test/regress/sql/multi_jsonb_object_agg.sql b/src/test/regress/sql/multi_jsonb_object_agg.sql index 59963cffa..4329b5ce4 100644 --- a/src/test/regress/sql/multi_jsonb_object_agg.sql +++ b/src/test/regress/sql/multi_jsonb_object_agg.sql @@ -4,6 +4,7 @@ SET citus.next_shard_id TO 520000; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE OR REPLACE FUNCTION count_keys (jsonb) RETURNS bigint LANGUAGE SQL diff --git a/src/test/regress/sql/multi_simple_queries.sql b/src/test/regress/sql/multi_simple_queries.sql index 50a2b1d34..a9c776c94 100644 --- a/src/test/regress/sql/multi_simple_queries.sql +++ b/src/test/regress/sql/multi_simple_queries.sql @@ -5,6 +5,7 @@ SET citus.next_shard_id TO 850000; -- router planner, so we're explicitly disabling it in this file. -- We've bunch of other tests that triggers fast-path-router SET citus.enable_fast_path_router_planner TO false; +SET citus.coordinator_aggregation_strategy TO 'disabled'; -- =================================================================== -- test end-to-end query functionality diff --git a/src/test/regress/sql/multi_subquery.sql b/src/test/regress/sql/multi_subquery.sql index 39a67acd7..3301c35eb 100644 --- a/src/test/regress/sql/multi_subquery.sql +++ b/src/test/regress/sql/multi_subquery.sql @@ -4,6 +4,7 @@ -- no need to set shardid sequence given that we're not creating any shards SET citus.next_shard_id TO 570032; +SET citus.coordinator_aggregation_strategy TO 'disabled'; -- Check that we error out if shard min/max values are not exactly same. SELECT diff --git a/src/test/regress/sql/set_operations.sql b/src/test/regress/sql/set_operations.sql index 99adf3d64..03481fe5b 100644 --- a/src/test/regress/sql/set_operations.sql +++ b/src/test/regress/sql/set_operations.sql @@ -1,5 +1,6 @@ CREATE SCHEMA recursive_union; SET search_path TO recursive_union, public; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE TABLE recursive_union.test (x int, y int); SELECT create_distributed_table('test', 'x'); diff --git a/src/test/regress/sql/subqueries_not_supported.sql b/src/test/regress/sql/subqueries_not_supported.sql index 470599d33..4af92d435 100644 --- a/src/test/regress/sql/subqueries_not_supported.sql +++ b/src/test/regress/sql/subqueries_not_supported.sql @@ -3,6 +3,7 @@ -- =================================================================== CREATE SCHEMA not_supported; SET search_path TO not_supported, public; +SET citus.coordinator_aggregation_strategy TO 'disabled'; SET client_min_messages TO DEBUG1; diff --git a/src/test/regress/sql/with_basics.sql b/src/test/regress/sql/with_basics.sql index 3891b5818..ac556f9cd 100644 --- a/src/test/regress/sql/with_basics.sql +++ b/src/test/regress/sql/with_basics.sql @@ -1,5 +1,6 @@ -- Test the basic CTE functionality and expected error messages SET search_path TO 'with_basics'; +SET citus.coordinator_aggregation_strategy TO 'disabled'; CREATE TYPE with_basics.xy AS (x int, y int); -- CTEs in FROM should work