Implement pulling up rows to coordinator when aggregates cannot be pushed down. Enabled by default

pull/3324/head
Philip Dubé 2019-12-16 19:40:35 +00:00
parent 16b4140dc8
commit 863bf49507
40 changed files with 609 additions and 147 deletions

View File

@ -42,7 +42,8 @@ static bool ShouldPullDistinctColumn(bool repartitionSubquery,
* value should be used in a read-only manner.
*/
ExtendedOpNodeProperties
BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode)
BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode, bool
pullUpIntermediateRows)
{
ExtendedOpNodeProperties extendedOpNodeProperties;
@ -75,6 +76,7 @@ BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode)
hasNonPartitionColumnDistinctAgg;
extendedOpNodeProperties.pullDistinctColumns = pullDistinctColumns;
extendedOpNodeProperties.pushDownWindowFunctions = pushDownWindowFunctions;
extendedOpNodeProperties.pullUpIntermediateRows = pullUpIntermediateRows;
return extendedOpNodeProperties;
}

View File

@ -27,8 +27,10 @@
#include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/errormessage.h"
#include "distributed/extended_op_node_utils.h"
#include "distributed/function_utils.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h"
@ -59,19 +61,19 @@
/* Config variable managed via guc.c */
int LimitClauseRowFetchCount = -1; /* number of rows to fetch from each task */
double CountDistinctErrorRate = 0.0; /* precision of count(distinct) approximate */
int CoordinatorAggregationStrategy = COORDINATOR_AGGREGATION_ROW_GATHER;
typedef struct MasterAggregateWalkerContext
{
const ExtendedOpNodeProperties *extendedOpNodeProperties;
AttrNumber columnId;
bool pullDistinctColumns;
} MasterAggregateWalkerContext;
typedef struct WorkerAggregateWalkerContext
{
const ExtendedOpNodeProperties *extendedOpNodeProperties;
List *expressionList;
bool createGroupByClause;
bool pullDistinctColumns;
} WorkerAggregateWalkerContext;
@ -189,7 +191,8 @@ static void ParentSetNewChild(MultiNode *parentNode, MultiNode *oldChildNode,
static void ApplyExtendedOpNodes(MultiExtendedOp *originalNode,
MultiExtendedOp *masterNode,
MultiExtendedOp *workerNode);
static void TransformSubqueryNode(MultiTable *subqueryNode);
static void TransformSubqueryNode(MultiTable *subqueryNode, bool
requiresIntermediateRowPullUp);
static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode,
ExtendedOpNodeProperties *
extendedOpNodeProperties);
@ -267,12 +270,18 @@ static Const * MakeIntegerConst(int32 integerValue);
static Const * MakeIntegerConstInt64(int64 integerValue);
/* Local functions forward declarations for aggregate expression checks */
static void ErrorIfContainsUnsupportedAggregate(MultiNode *logicalPlanNode);
static void ErrorIfUnsupportedArrayAggregate(Aggref *arrayAggregateExpression);
static void ErrorIfUnsupportedJsonAggregate(AggregateType type,
Aggref *aggregateExpression);
static void ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
static bool RequiresIntermediateRowPullUp(MultiNode *logicalPlanNode);
static DeferredErrorMessage * DeferErrorIfContainsNonPushdownableAggregate(
MultiNode *logicalPlanNode);
static DeferredErrorMessage * DeferErrorIfUnsupportedArrayAggregate(
Aggref *arrayAggregateExpression);
static DeferredErrorMessage * DeferErrorIfUnsupportedJsonAggregate(AggregateType type,
Aggref *
aggregateExpression);
static DeferredErrorMessage * DeferErrorIfUnsupportedAggregateDistinct(
Aggref *aggregateExpression,
MultiNode *
logicalPlanNode);
static Var * AggregateDistinctColumn(Aggref *aggregateExpression);
static bool TablePartitioningSupportsDistinct(List *tableNodeList,
MultiExtendedOp *opNode,
@ -315,16 +324,29 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
ListCell *collectNodeCell = NULL;
ListCell *tableNodeCell = NULL;
MultiNode *logicalPlanNode = (MultiNode *) multiLogicalPlan;
bool requiresIntermediateRowPullUp = RequiresIntermediateRowPullUp(logicalPlanNode);
List *extendedOpNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp);
MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList);
ExtendedOpNodeProperties extendedOpNodeProperties = BuildExtendedOpNodeProperties(
extendedOpNode);
extendedOpNode, requiresIntermediateRowPullUp);
if (!extendedOpNodeProperties.groupedByDisjointPartitionColumn)
if (!extendedOpNodeProperties.groupedByDisjointPartitionColumn &&
!extendedOpNodeProperties.pullUpIntermediateRows)
{
/* check that we can optimize aggregates in the plan */
ErrorIfContainsUnsupportedAggregate(logicalPlanNode);
DeferredErrorMessage *aggregatePushdownError =
DeferErrorIfContainsNonPushdownableAggregate(logicalPlanNode);
if (aggregatePushdownError != NULL)
{
if (CoordinatorAggregationStrategy == COORDINATOR_AGGREGATION_DISABLED)
{
RaiseDeferredError(aggregatePushdownError, ERROR);
}
else
{
extendedOpNodeProperties.pullUpIntermediateRows = true;
}
}
}
/*
@ -382,7 +404,6 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
* clause list to the worker operator node. We then push the worker operator
* node below the collect node.
*/
MultiExtendedOp *masterExtendedOpNode =
MasterExtendedOpNode(extendedOpNode, &extendedOpNodeProperties);
MultiExtendedOp *workerExtendedOpNode =
@ -396,8 +417,23 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
MultiTable *tableNode = (MultiTable *) lfirst(tableNodeCell);
if (tableNode->relationId == SUBQUERY_RELATION_ID)
{
ErrorIfContainsUnsupportedAggregate((MultiNode *) tableNode);
TransformSubqueryNode(tableNode);
DeferredErrorMessage *error =
DeferErrorIfContainsNonPushdownableAggregate((MultiNode *) tableNode);
bool subqueryRequiresIntermediateRowPullUp = false;
if (error != NULL)
{
if (CoordinatorAggregationStrategy == COORDINATOR_AGGREGATION_DISABLED)
{
RaiseDeferredError(error, ERROR);
}
else
{
subqueryRequiresIntermediateRowPullUp = true;
}
}
TransformSubqueryNode(tableNode, subqueryRequiresIntermediateRowPullUp);
}
}
@ -1281,15 +1317,22 @@ ApplyExtendedOpNodes(MultiExtendedOp *originalNode, MultiExtendedOp *masterNode,
* operator node.
*/
static void
TransformSubqueryNode(MultiTable *subqueryNode)
TransformSubqueryNode(MultiTable *subqueryNode, bool requiresIntermediateRowPullUp)
{
if (CoordinatorAggregationStrategy != COORDINATOR_AGGREGATION_DISABLED &&
RequiresIntermediateRowPullUp((MultiNode *) subqueryNode))
{
requiresIntermediateRowPullUp = true;
}
MultiExtendedOp *extendedOpNode =
(MultiExtendedOp *) ChildNode((MultiUnaryNode *) subqueryNode);
MultiNode *collectNode = ChildNode((MultiUnaryNode *) extendedOpNode);
MultiNode *collectChildNode = ChildNode((MultiUnaryNode *) collectNode);
ExtendedOpNodeProperties extendedOpNodeProperties =
BuildExtendedOpNodeProperties(extendedOpNode);
BuildExtendedOpNodeProperties(extendedOpNode, requiresIntermediateRowPullUp);
MultiExtendedOp *masterExtendedOpNode =
MasterExtendedOpNode(extendedOpNode, &extendedOpNodeProperties);
MultiExtendedOp *workerExtendedOpNode =
@ -1369,8 +1412,8 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode,
MasterAggregateWalkerContext *walkerContext = palloc0(
sizeof(MasterAggregateWalkerContext));
walkerContext->extendedOpNodeProperties = extendedOpNodeProperties;
walkerContext->columnId = 1;
walkerContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns;
/* iterate over original target entries */
foreach(targetEntryCell, targetEntryList)
@ -1509,16 +1552,62 @@ static Expr *
MasterAggregateExpression(Aggref *originalAggregate,
MasterAggregateWalkerContext *walkerContext)
{
AggregateType aggregateType = GetAggregateType(originalAggregate);
Expr *newMasterExpression = NULL;
const uint32 masterTableId = 1; /* one table on the master node */
const Index columnLevelsUp = 0; /* normal column */
const AttrNumber argumentId = 1; /* our aggregates have single arguments */
AggregateType aggregateType = GetAggregateType(originalAggregate);
Expr *newMasterExpression = NULL;
AggClauseCosts aggregateCosts;
if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
if (walkerContext->extendedOpNodeProperties->pullUpIntermediateRows)
{
Aggref *aggregate = (Aggref *) copyObject(originalAggregate);
TargetEntry *targetEntry;
foreach_ptr(targetEntry, aggregate->args)
{
targetEntry->expr = (Expr *)
makeVar(masterTableId, walkerContext->columnId,
exprType((Node *) targetEntry->expr),
exprTypmod((Node *) targetEntry->expr),
exprCollation((Node *) targetEntry->expr),
columnLevelsUp);
walkerContext->columnId++;
}
aggregate->aggdirectargs = NIL;
Expr *directarg;
foreach_ptr(directarg, originalAggregate->aggdirectargs)
{
if (!IsA(directarg, Const) && !IsA(directarg, Param))
{
Var *var = makeVar(masterTableId, walkerContext->columnId,
exprType((Node *) directarg),
exprTypmod((Node *) directarg),
exprCollation((Node *) directarg),
columnLevelsUp);
aggregate->aggdirectargs = lappend(aggregate->aggdirectargs, var);
walkerContext->columnId++;
}
else
{
aggregate->aggdirectargs = lappend(aggregate->aggdirectargs, directarg);
}
}
if (aggregate->aggfilter)
{
aggregate->aggfilter = (Expr *)
makeVar(masterTableId, walkerContext->columnId,
BOOLOID, -1, InvalidOid, columnLevelsUp);
walkerContext->columnId++;
}
newMasterExpression = (Expr *) aggregate;
}
else if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION &&
walkerContext->pullDistinctColumns)
walkerContext->extendedOpNodeProperties->pullDistinctColumns)
{
Aggref *aggregate = (Aggref *) copyObject(originalAggregate);
List *varList = pull_var_clause_default((Node *) aggregate);
@ -1834,7 +1923,7 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterExpression = (Expr *) unionAggregate;
}
else if (aggregateType == AGGREGATE_CUSTOM)
else if (aggregateType == AGGREGATE_CUSTOM_COMBINE)
{
HeapTuple aggTuple =
SearchSysCache1(AGGFNOID, ObjectIdGetDatum(originalAggregate->aggfnoid));
@ -2102,8 +2191,17 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
/* targetProjectionNumber starts from 1 */
queryTargetList.targetProjectionNumber = 1;
/* worker query always include all the group by entries in the query */
/*
* only push down grouping to worker query when pushing down aggregates
*/
if (extendedOpNodeProperties->pullUpIntermediateRows)
{
queryGroupClause.groupClauseList = NIL;
}
else
{
queryGroupClause.groupClauseList = copyObject(originalGroupClauseList);
}
/*
* nextSortGroupRefIndex is used by group by, window and order by clauses.
@ -2122,14 +2220,16 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
&queryHavingQual, &queryTargetList,
&queryGroupClause);
ProcessWindowFunctionsForWorkerQuery(originalWindowClause, originalTargetEntryList,
&queryWindowClause, &queryTargetList);
if (!extendedOpNodeProperties->pullUpIntermediateRows)
{
ProcessDistinctClauseForWorkerQuery(originalDistinctClause, hasDistinctOn,
queryGroupClause.groupClauseList,
queryHasAggregates, &queryDistinctClause,
&distinctPreventsLimitPushdown);
ProcessWindowFunctionsForWorkerQuery(originalWindowClause, originalTargetEntryList,
&queryWindowClause, &queryTargetList);
/*
* Order by and limit clauses are relevant to each other, and processing
* them together makes it handy for us.
@ -2158,6 +2258,7 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
&queryOrderByLimit,
&queryTargetList);
}
}
/* finally, fill the extended op node with the data we gathered */
MultiExtendedOp *workerExtendedOpNode = CitusMakeNode(MultiExtendedOp);
@ -2211,8 +2312,8 @@ ProcessTargetListForWorkerQuery(List *targetEntryList,
WorkerAggregateWalkerContext *workerAggContext =
palloc0(sizeof(WorkerAggregateWalkerContext));
workerAggContext->extendedOpNodeProperties = extendedOpNodeProperties;
workerAggContext->expressionList = NIL;
workerAggContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns;
/* iterate over original target entries */
foreach(targetEntryCell, targetEntryList)
@ -2274,8 +2375,6 @@ ProcessHavingClauseForWorkerQuery(Node *originalHavingQual,
QueryTargetList *queryTargetList,
QueryGroupClause *queryGroupClause)
{
TargetEntry *targetEntry = NULL;
if (originalHavingQual == NULL)
{
return;
@ -2291,13 +2390,14 @@ ProcessHavingClauseForWorkerQuery(Node *originalHavingQual,
*/
WorkerAggregateWalkerContext *workerAggContext = palloc0(
sizeof(WorkerAggregateWalkerContext));
workerAggContext->extendedOpNodeProperties = extendedOpNodeProperties;
workerAggContext->expressionList = NIL;
workerAggContext->pullDistinctColumns =
extendedOpNodeProperties->pullDistinctColumns;
workerAggContext->createGroupByClause = false;
WorkerAggregateWalker(originalHavingQual, workerAggContext);
List *newExpressionList = workerAggContext->expressionList;
TargetEntry *targetEntry = NULL;
ExpandWorkerTargetEntry(newExpressionList, targetEntry,
workerAggContext->createGroupByClause,
@ -2348,7 +2448,7 @@ ProcessHavingClauseForWorkerQuery(Node *originalHavingQual,
* and DISTINCT ON clauses to the worker queries.
*
* The function also sets distinctPreventsLimitPushdown. As the name reveals,
* distinct could prevent pushwing down LIMIT clauses later in the planning.
* distinct could prevent pushing down LIMIT clauses later in the planning.
* For the details, see the comments in the function.
*
* inputs: distinctClause, hasDistinctOn, groupClauseList, queryHasAggregates
@ -2362,14 +2462,14 @@ ProcessDistinctClauseForWorkerQuery(List *distinctClause, bool hasDistinctOn,
QueryDistinctClause *queryDistinctClause,
bool *distinctPreventsLimitPushdown)
{
bool distinctClauseSupersetofGroupClause = false;
*distinctPreventsLimitPushdown = false;
if (distinctClause == NIL)
{
return;
}
*distinctPreventsLimitPushdown = false;
bool distinctClauseSupersetofGroupClause = false;
if (groupClauseList == NIL ||
IsGroupBySubsetOfDistinct(groupClauseList, distinctClause))
@ -2412,13 +2512,13 @@ ProcessDistinctClauseForWorkerQuery(List *distinctClause, bool hasDistinctOn,
* pushdown.
*
* TODO: Citus only supports pushing down window clauses as-is under certain circumstances.
* And, at this point in the planning, we are guaraanted to process a window function
* And, at this point in the planning, we are guaranteed to process a window function
* which is safe to pushdown as-is. It should also be possible to pull the relevant data
* to the coordinator and apply the window clauses for the remaining cases.
*
* Note that even though Citus only pushes down the window functions, it may need to
* modify the target list of the worker query when the window function refers to
* an avg(). The reason is that any aggragate which is also referred by other
* an avg(). The reason is that any aggregate which is also referred by other
* target entries would be mutated by Citus. Thus, we add a copy of the same aggragate
* to the worker target list to make sure that the window function refers to the
* non-mutated aggragate.
@ -2465,8 +2565,8 @@ ProcessWindowFunctionsForWorkerQuery(List *windowClauseList,
* Note that even Citus does push down the window clauses as-is, we may still need to
* add the generated entries to the target list. The reason is that the same aggragates
* might be referred from another target entry that is a bare aggragate (e.g., no window
* functions), which would have been mutated. For instance, when an average aggragate
* is mutated on the target list, the window function would refer to a sum aggragate,
* functions), which would have been mutated. For instance, when an average aggregate
* is mutated on the target list, the window function would refer to a sum aggregate,
* which is obviously wrong.
*/
queryTargetList->targetEntryList = list_concat(queryTargetList->targetEntryList,
@ -2614,7 +2714,7 @@ ExpandWorkerTargetEntry(List *expressionList, TargetEntry *originalTargetEntry,
TargetEntry *newTargetEntry =
GenerateWorkerTargetEntry(originalTargetEntry, newExpression,
queryTargetList->targetProjectionNumber);
(queryTargetList->targetProjectionNumber)++;
queryTargetList->targetProjectionNumber++;
queryTargetList->targetEntryList =
lappend(queryTargetList->targetEntryList, newTargetEntry);
@ -2682,7 +2782,7 @@ GenerateWorkerTargetEntry(TargetEntry *targetEntry, Expr *workerExpression,
*/
if (targetEntry)
{
newTargetEntry = copyObject(targetEntry);
newTargetEntry = flatCopyTargetEntry(targetEntry);
}
else
{
@ -2699,7 +2799,7 @@ GenerateWorkerTargetEntry(TargetEntry *targetEntry, Expr *workerExpression,
newTargetEntry->resname = columnNameString->data;
}
/* we can generate a target entry without any expressions */
/* we can't generate a target entry without an expression */
Assert(workerExpression != NULL);
/* force resjunk to false as we may need this on the master */
@ -2795,13 +2895,40 @@ static List *
WorkerAggregateExpressionList(Aggref *originalAggregate,
WorkerAggregateWalkerContext *walkerContext)
{
AggregateType aggregateType = GetAggregateType(originalAggregate);
List *workerAggregateList = NIL;
if (walkerContext->extendedOpNodeProperties->pullUpIntermediateRows)
{
TargetEntry *targetEntry;
foreach_ptr(targetEntry, originalAggregate->args)
{
workerAggregateList = lappend(workerAggregateList, targetEntry->expr);
}
Expr *directarg;
foreach_ptr(directarg, originalAggregate->aggdirectargs)
{
if (!IsA(directarg, Const) && !IsA(directarg, Param))
{
workerAggregateList = lappend(workerAggregateList, directarg);
}
}
if (originalAggregate->aggfilter)
{
workerAggregateList = lappend(workerAggregateList,
originalAggregate->aggfilter);
}
return workerAggregateList;
}
AggregateType aggregateType = GetAggregateType(originalAggregate);
AggClauseCosts aggregateCosts;
if (aggregateType == AGGREGATE_COUNT && originalAggregate->aggdistinct &&
CountDistinctErrorRate == DISABLE_DISTINCT_APPROXIMATION &&
walkerContext->pullDistinctColumns)
walkerContext->extendedOpNodeProperties->pullDistinctColumns)
{
Aggref *aggregate = (Aggref *) copyObject(originalAggregate);
List *columnList = pull_var_clause_default((Node *) aggregate);
@ -2910,7 +3037,7 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
workerAggregateList = lappend(workerAggregateList, sumAggregate);
workerAggregateList = lappend(workerAggregateList, countAggregate);
}
else if (aggregateType == AGGREGATE_CUSTOM)
else if (aggregateType == AGGREGATE_CUSTOM_COMBINE)
{
HeapTuple aggTuple =
SearchSysCache1(AGGFNOID, ObjectIdGetDatum(originalAggregate->aggfnoid));
@ -3019,10 +3146,17 @@ GetAggregateType(Aggref *aggregateExpression)
if (AggregateEnabledCustom(aggregateExpression))
{
return AGGREGATE_CUSTOM;
return AGGREGATE_CUSTOM_COMBINE;
}
if (CoordinatorAggregationStrategy == COORDINATOR_AGGREGATION_DISABLED)
{
ereport(ERROR, (errmsg("unsupported aggregate function %s", aggregateProcName)));
}
else
{
return AGGREGATE_CUSTOM_ROW_GATHER;
}
}
@ -3337,14 +3471,59 @@ MakeIntegerConstInt64(int64 integerValue)
/*
* ErrorIfContainsUnsupportedAggregate extracts aggregate expressions from the
* logical plan, walks over them and uses helper functions to check if we can
* transform these aggregate expressions and push them down to worker nodes.
* These helper functions error out if we cannot transform the aggregates.
* RequiresIntermediateRowPullUp checks for if any aggregates cannot be pushed down.
*/
static void
ErrorIfContainsUnsupportedAggregate(MultiNode *logicalPlanNode)
static bool
RequiresIntermediateRowPullUp(MultiNode *logicalPlanNode)
{
if (CoordinatorAggregationStrategy == COORDINATOR_AGGREGATION_DISABLED)
{
return false;
}
List *opNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp);
MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) linitial(opNodeList);
List *targetList = extendedOpNode->targetList;
/*
* PVC_REJECT_PLACEHOLDERS is implicit if PVC_INCLUDE_PLACEHOLDERS isn't
* specified.
*/
List *expressionList = pull_var_clause((Node *) targetList, PVC_INCLUDE_AGGREGATES |
PVC_INCLUDE_WINDOWFUNCS);
Node *expression = NULL;
foreach_ptr(expression, expressionList)
{
/* only consider aggregate expressions */
if (!IsA(expression, Aggref))
{
continue;
}
AggregateType aggregateType = GetAggregateType((Aggref *) expression);
Assert(aggregateType != AGGREGATE_INVALID_FIRST);
if (aggregateType == AGGREGATE_CUSTOM_ROW_GATHER)
{
return true;
}
}
return false;
}
/*
* DeferErrorIfContainsNonPushdownableAggregate extracts aggregate expressions from
* the logical plan, walks over them and uses helper functions to check if we
* can transform these aggregate expressions and push them down to worker nodes.
*/
static DeferredErrorMessage *
DeferErrorIfContainsNonPushdownableAggregate(MultiNode *logicalPlanNode)
{
DeferredErrorMessage *error = NULL;
List *opNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp);
MultiExtendedOp *extendedOpNode = (MultiExtendedOp *) linitial(opNodeList);
@ -3380,92 +3559,119 @@ ErrorIfContainsUnsupportedAggregate(MultiNode *logicalPlanNode)
*/
if (aggregateType == AGGREGATE_ARRAY_AGG)
{
ErrorIfUnsupportedArrayAggregate(aggregateExpression);
error = DeferErrorIfUnsupportedArrayAggregate(aggregateExpression);
}
else if (aggregateType == AGGREGATE_JSONB_AGG ||
aggregateType == AGGREGATE_JSON_AGG)
{
ErrorIfUnsupportedJsonAggregate(aggregateType, aggregateExpression);
error = DeferErrorIfUnsupportedJsonAggregate(aggregateType,
aggregateExpression);
}
else if (aggregateType == AGGREGATE_JSONB_OBJECT_AGG ||
aggregateType == AGGREGATE_JSON_OBJECT_AGG)
{
ErrorIfUnsupportedJsonAggregate(aggregateType, aggregateExpression);
error = DeferErrorIfUnsupportedJsonAggregate(aggregateType,
aggregateExpression);
}
else if (aggregateExpression->aggdistinct)
{
ErrorIfUnsupportedAggregateDistinct(aggregateExpression, logicalPlanNode);
error = DeferErrorIfUnsupportedAggregateDistinct(aggregateExpression,
logicalPlanNode);
}
if (error != NULL)
{
return error;
}
}
return NULL;
}
/*
* ErrorIfUnsupportedArrayAggregate checks if we can transform the array aggregate
* DeferErrorIfUnsupportedArrayAggregate checks if we can transform the array aggregate
* expression and push it down to the worker node. If we cannot transform the
* aggregate, this function errors.
*/
static void
ErrorIfUnsupportedArrayAggregate(Aggref *arrayAggregateExpression)
static DeferredErrorMessage *
DeferErrorIfUnsupportedArrayAggregate(Aggref *arrayAggregateExpression)
{
/* if array_agg has order by, we error out */
if (arrayAggregateExpression->aggorder)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("array_agg with order by is unsupported")));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"array_agg with order by is unsupported",
NULL, NULL);
}
/* if array_agg has distinct, we error out */
if (arrayAggregateExpression->aggdistinct)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("array_agg (distinct) is unsupported")));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"array_agg (distinct) is unsupported",
NULL, NULL);
}
return NULL;
}
/*
* ErrorIfUnsupportedJsonAggregate checks if we can transform the json
* DeferErrorIfUnsupportedJsonAggregate checks if we can transform the json
* aggregate expression and push it down to the worker node. If we cannot
* transform the aggregate, this function errors.
*/
static void
ErrorIfUnsupportedJsonAggregate(AggregateType type,
static DeferredErrorMessage *
DeferErrorIfUnsupportedJsonAggregate(AggregateType type,
Aggref *aggregateExpression)
{
/* if json aggregate has order by, we error out */
if (aggregateExpression->aggdistinct || aggregateExpression->aggorder)
{
StringInfoData errorDetail;
initStringInfo(&errorDetail);
const char *name = AggregateNames[type];
appendStringInfoString(&errorDetail, name);
if (aggregateExpression->aggorder)
{
const char *name = AggregateNames[type];
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("%s with order by is unsupported", name)));
appendStringInfoString(&errorDetail, " with order by is unsupported");
}
else
{
appendStringInfoString(&errorDetail, " (distinct) is unsupported");
}
/* if json aggregate has distinct, we error out */
if (aggregateExpression->aggdistinct)
{
const char *name = AggregateNames[type];
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("%s (distinct) is unsupported", name)));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorDetail.data,
NULL, NULL);
}
return NULL;
}
/*
* ErrorIfUnsupportedAggregateDistinct checks if we can transform the aggregate
* DeferErrorIfUnsupportedAggregateDistinct checks if we can transform the aggregate
* (distinct expression) and push it down to the worker node. It handles count
* (distinct) separately to check if we can use distinct approximations. If we
* cannot transform the aggregate, this function errors.
*/
static void
ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
static DeferredErrorMessage *
DeferErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
MultiNode *logicalPlanNode)
{
char *errorDetail = NULL;
const char *errorDetail = NULL;
bool distinctSupported = true;
AggregateType aggregateType = GetAggregateType(aggregateExpression);
/* If we're aggregating on coordinator, this becomes simple. */
if (aggregateType == AGGREGATE_CUSTOM_ROW_GATHER)
{
return NULL;
}
/*
* We partially support count(distinct) in subqueries, other distinct aggregates in
* subqueries are not supported yet.
@ -3480,9 +3686,10 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
Var *column = (Var *) lfirst(columnCell);
if (column->varattno <= 0)
{
ereport(ERROR, (errmsg("cannot compute count (distinct)"),
errdetail("Non-column references are not supported "
"yet")));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot compute count (distinct)",
"Non-column references are not supported yet",
NULL);
}
}
}
@ -3496,9 +3703,10 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
if (multiTable->relationId == SUBQUERY_RELATION_ID ||
multiTable->relationId == SUBQUERY_PUSHDOWN_RELATION_ID)
{
ereport(ERROR, (errmsg("cannot compute aggregate (distinct)"),
errdetail("Only count(distinct) aggregate is "
"supported in subqueries")));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot compute aggregate (distinct)",
"Only count(distinct) aggregate is "
"supported in subqueries", NULL);
}
}
}
@ -3513,12 +3721,14 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
/* if extension for distinct approximation is loaded, we are good */
if (distinctExtensionId != InvalidOid)
{
return;
return NULL;
}
else
{
ereport(ERROR, (errmsg("cannot compute count (distinct) approximation"),
errhint("You need to have the hll extension loaded.")));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot compute count (distinct) approximation",
NULL,
"You need to have the hll extension loaded.");
}
}
@ -3580,21 +3790,19 @@ ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
/* if current aggregate expression isn't supported, error out */
if (!distinctSupported)
{
const char *errorHint = NULL;
if (aggregateType == AGGREGATE_COUNT)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot compute aggregate (distinct)"),
errdetail("%s", errorDetail),
errhint("You can load the hll extension from contrib "
"packages and enable distinct approximations.")));
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot compute aggregate (distinct)"),
errdetail("%s", errorDetail)));
errorHint = "You can load the hll extension from contrib "
"packages and enable distinct approximations.";
}
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot compute aggregate (distinct)",
errorDetail, errorHint);
}
return NULL;
}

View File

@ -50,7 +50,7 @@ static List * MasterTargetList(List *workerTargetList);
static PlannedStmt * BuildSelectStatement(Query *masterQuery, List *masterTargetList,
CustomScan *remoteScan);
static Agg * BuildAggregatePlan(PlannerInfo *root, Query *masterQuery, Plan *subPlan);
static bool HasDistinctAggregate(Query *masterQuery);
static bool HasDistinctOrOrderByAggregate(Query *masterQuery);
static bool UseGroupAggregateWithHLL(Query *masterQuery);
static bool QueryContainsAggregateWithHLL(Query *query);
static Plan * BuildDistinctPlan(Query *masterQuery, Plan *subPlan);
@ -361,7 +361,7 @@ FinalizeStatement(PlannerInfo *root, PlannedStmt *result, Plan *top_plan)
/*
* BuildAggregatePlan creates and returns an aggregate plan. This aggregate plan
* builds aggreation and grouping operators (if any) that are to be executed on
* builds aggregation and grouping operators (if any) that are to be executed on
* the master node.
*/
static Agg *
@ -405,7 +405,7 @@ BuildAggregatePlan(PlannerInfo *root, Query *masterQuery, Plan *subPlan)
{
bool groupingIsHashable = grouping_is_hashable(groupColumnList);
bool groupingIsSortable = grouping_is_sortable(groupColumnList);
bool hasDistinctAggregate = HasDistinctAggregate(masterQuery);
bool hasUnhashableAggregate = HasDistinctOrOrderByAggregate(masterQuery);
if (!groupingIsHashable && !groupingIsSortable)
{
@ -421,7 +421,7 @@ BuildAggregatePlan(PlannerInfo *root, Query *masterQuery, Plan *subPlan)
* If the master query contains hll aggregate functions and the client set
* hll.force_groupagg to on, then we choose to use group aggregation.
*/
if (!enable_hashagg || !groupingIsHashable || hasDistinctAggregate ||
if (!enable_hashagg || !groupingIsHashable || hasUnhashableAggregate ||
UseGroupAggregateWithHLL(masterQuery))
{
char *messageHint = NULL;
@ -465,7 +465,7 @@ BuildAggregatePlan(PlannerInfo *root, Query *masterQuery, Plan *subPlan)
* aggregate in its target list or in having clause.
*/
static bool
HasDistinctAggregate(Query *masterQuery)
HasDistinctOrOrderByAggregate(Query *masterQuery)
{
ListCell *allColumnCell = NULL;
@ -481,7 +481,7 @@ HasDistinctAggregate(Query *masterQuery)
if (IsA(columnNode, Aggref))
{
Aggref *aggref = (Aggref *) columnNode;
if (aggref->aggdistinct != NIL)
if (aggref->aggdistinct != NIL || aggref->aggorder != NIL)
{
return true;
}
@ -595,9 +595,9 @@ BuildDistinctPlan(Query *masterQuery, Plan *subPlan)
* Otherwise create sort+unique plan.
*/
bool distinctClausesHashable = grouping_is_hashable(distinctClauseList);
bool hasDistinctAggregate = HasDistinctAggregate(masterQuery);
bool hasUnhashableAggregate = HasDistinctOrOrderByAggregate(masterQuery);
if (enable_hashagg && distinctClausesHashable && !hasDistinctAggregate)
if (enable_hashagg && distinctClausesHashable && !hasUnhashableAggregate)
{
distinctPlan = (Plan *) makeAggNode(distinctClauseList, NIL, AGG_HASHED,
targetList, subPlan);

View File

@ -140,6 +140,12 @@ static const struct config_enum_entry use_secondary_nodes_options[] = {
{ NULL, 0, false }
};
static const struct config_enum_entry coordinator_aggregation_options[] = {
{ "disabled", COORDINATOR_AGGREGATION_DISABLED, false },
{ "row-gather", COORDINATOR_AGGREGATION_ROW_GATHER, false },
{ NULL, 0, false }
};
static const struct config_enum_entry shard_commit_protocol_options[] = {
{ "1pc", COMMIT_PROTOCOL_1PC, false },
{ "2pc", COMMIT_PROTOCOL_2PC, false },
@ -1006,6 +1012,19 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.coordinator_aggregation_strategy",
gettext_noop("Sets the strategy for when an aggregate cannot be pushed down. "
"'row-gather' will pull up intermediate rows to the coordinator, "
"while 'disabled' will error if coordinator aggregation is necessary"),
NULL,
&CoordinatorAggregationStrategy,
COORDINATOR_AGGREGATION_ROW_GATHER,
coordinator_aggregation_options,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.multi_shard_commit_protocol",
gettext_noop("Sets the commit protocol for commands modifying multiple shards."),

View File

@ -28,11 +28,12 @@ typedef struct ExtendedOpNodeProperties
bool hasNonPartitionColumnDistinctAgg;
bool pullDistinctColumns;
bool pushDownWindowFunctions;
bool pullUpIntermediateRows;
} ExtendedOpNodeProperties;
extern ExtendedOpNodeProperties BuildExtendedOpNodeProperties(MultiExtendedOp *
extendedOpNode);
extern ExtendedOpNodeProperties BuildExtendedOpNodeProperties(
MultiExtendedOp *extendedOpNode, bool pullUpIntermediateRows);
#endif /* EXTENDED_OP_NODE_UTILS_H_ */

View File

@ -81,9 +81,19 @@ typedef enum
AGGREGATE_ANY_VALUE = 20,
/* AGGREGATE_CUSTOM must come last */
AGGREGATE_CUSTOM = 21
AGGREGATE_CUSTOM_COMBINE = 21,
AGGREGATE_CUSTOM_ROW_GATHER = 22,
} AggregateType;
/* Enumeration for citus.coordinator_aggregation GUC */
typedef enum
{
COORDINATOR_AGGREGATION_DISABLED,
COORDINATOR_AGGREGATION_ROW_GATHER,
} CoordinatorAggregationStrategyType;
/*
* PushDownStatus indicates whether a node can be pushed down below its child
* using the commutative and distributive relational algebraic properties.
@ -135,6 +145,7 @@ static const char *const AggregateNames[] = {
/* Config variable managed via guc.c */
extern int LimitClauseRowFetchCount;
extern double CountDistinctErrorRate;
extern int CoordinatorAggregationStrategy;
/* Function declaration for optimizing logical plans */

View File

@ -4,6 +4,7 @@
-- Tests support for user defined aggregates
create schema aggregate_support;
set search_path to aggregate_support;
set citus.coordinator_aggregation_strategy to 'disabled';
-- We test with & without STRICT as our code is responsible for managing these NULL checks
create function sum2_sfunc_strict(state int, x int)
returns int immutable strict language plpgsql as $$
@ -113,6 +114,19 @@ select sum2(val), sum2_strict(val) from aggdata where valf = 0;
0 |
(1 row)
-- Test HAVING
select key, stddev(valf) from aggdata group by key having stddev(valf) > 2 order by key;
key | stddev
---------------------------------------------------------------------
1 | 6.43467170879758
(1 row)
select key, stddev(valf) from aggdata group by key having stddev(val::float8) > 1 order by key;
key | stddev
---------------------------------------------------------------------
2 | 1.01500410508201
(1 row)
-- test polymorphic aggregates from https://github.com/citusdata/citus/issues/2397
-- we do not currently support pseudotypes for transition types, so this errors for now
CREATE OR REPLACE FUNCTION first_agg(anyelement, anyelement)
@ -255,5 +269,139 @@ select array_collect_sort(val) from aggdata;
(1 row)
reset role;
-- Test aggregation on coordinator
set citus.coordinator_aggregation_strategy to 'row-gather';
select key, first(val order by id), last(val order by id)
from aggdata group by key order by key;
key | first | last
---------------------------------------------------------------------
1 | 2 |
2 | 2 | 5
3 | 4 | 4
5 | |
6 | |
7 | 8 | 8
9 | 0 | 0
(7 rows)
select key, sum2(distinct val), sum2_strict(distinct val) from aggdata group by key order by key;
key | sum2 | sum2_strict
---------------------------------------------------------------------
1 | | 4
2 | 20 | 20
3 | 8 | 8
5 | |
6 | |
7 | 16 | 16
9 | 0 | 0
(7 rows)
select key, sum2(val order by valf), sum2_strict(val order by valf) from aggdata group by key order by key;
key | sum2 | sum2_strict
---------------------------------------------------------------------
1 | | 4
2 | 20 | 20
3 | 8 | 8
5 | |
6 | |
7 | 16 | 16
9 | 0 | 0
(7 rows)
select string_agg(distinct floor(val/2)::text, '|' order by floor(val/2)::text) from aggdata;
string_agg
---------------------------------------------------------------------
0|1|2|4
(1 row)
select string_agg(distinct floor(val/2)::text, '|' order by floor(val/2)::text) filter (where val < 5) from aggdata;
string_agg
---------------------------------------------------------------------
0|1|2
(1 row)
select mode() within group (order by floor(val/2)) from aggdata;
mode
---------------------------------------------------------------------
1
(1 row)
select percentile_cont(0.5) within group(order by valf) from aggdata;
percentile_cont
---------------------------------------------------------------------
8.225
(1 row)
select key, percentile_cont(key/10.0) within group(order by val) from aggdata group by key;
key | percentile_cont
---------------------------------------------------------------------
1 | 2
2 | 2.4
3 | 4
5 |
6 |
7 | 8
9 | 0
(7 rows)
select floor(val/2), corr(valf, valf + val) from aggdata group by floor(val/2) order by 1;
floor | corr
---------------------------------------------------------------------
0 |
1 | 0.991808518376741
2 | 1
4 |
|
(5 rows)
select floor(val/2), corr(valf, valf + val) from aggdata group by floor(val/2) having corr(valf + val, val) < 1 order by 1;
floor | corr
---------------------------------------------------------------------
1 | 0.991808518376741
2 | 1
(2 rows)
select array_agg(val order by valf) from aggdata;
array_agg
---------------------------------------------------------------------
{0,NULL,2,3,5,2,4,NULL,NULL,8,NULL}
(1 row)
-- Test TransformSubqueryNode
SET citus.task_executor_type to "task-tracker";
select * FROM (
SELECT key, mode() within group (order by floor(agg1.val/2)) m from aggdata agg1
group by key
) subq ORDER BY 2, 1 LIMIT 5;
key | m
---------------------------------------------------------------------
9 | 0
1 | 1
2 | 1
3 | 2
7 | 4
(5 rows)
select * FROM (
SELECT key, avg(distinct floor(agg1.val/2)) m from aggdata agg1
group by key
) subq;
key | m
---------------------------------------------------------------------
1 | 1
5 |
3 | 2
7 | 4
6 |
2 | 1.5
9 | 0
(7 rows)
RESET citus.task_executor_type;
-- This fails due to table types not being managed properly
select key, count(distinct aggdata)
from aggdata group by key order by 1, 2;
ERROR: type "aggregate_support.aggdata" does not exist
CONTEXT: while executing command on localhost:xxxxx
set client_min_messages to error;
drop schema aggregate_support cascade;

View File

@ -11,6 +11,7 @@ WHERE name = 'hll'
:create_cmd;
ERROR: extension "hll" already exists
SET citus.shard_count TO 4;
set citus.coordinator_aggregation_strategy to 'disabled';
CREATE TABLE raw_table (day date, user_id int);
CREATE TABLE daily_uniques(day date, unique_users hll);
SELECT create_distributed_table('raw_table', 'user_id');

View File

@ -15,6 +15,7 @@ WHERE name = 'hll'
(1 row)
SET citus.shard_count TO 4;
set citus.coordinator_aggregation_strategy to 'disabled';
CREATE TABLE raw_table (day date, user_id int);
CREATE TABLE daily_uniques(day date, unique_users hll);
ERROR: type "hll" does not exist

View File

@ -11,6 +11,7 @@ WHERE name = 'hll'
:create_cmd;
ERROR: extension "hll" already exists
SET citus.shard_count TO 4;
set citus.coordinator_aggregation_strategy to 'disabled';
CREATE TABLE raw_table (day date, user_id int);
CREATE TABLE daily_uniques(day date, unique_users hll);
SELECT create_distributed_table('raw_table', 'user_id');

View File

@ -9,6 +9,7 @@ AS create_cmd FROM pg_available_extensions()
WHERE name = 'hll'
\gset
:create_cmd;
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- Try to execute count(distinct) when approximate distincts aren't enabled
SELECT count(distinct l_orderkey) FROM lineitem;
count

View File

@ -14,6 +14,7 @@ WHERE name = 'hll'
f
(1 row)
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- Try to execute count(distinct) when approximate distincts aren't enabled
SELECT count(distinct l_orderkey) FROM lineitem;
count

View File

@ -2,6 +2,7 @@
-- MULTI_ARRAY_AGG
--
SET citus.next_shard_id TO 520000;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE OR REPLACE FUNCTION array_sort (ANYARRAY)
RETURNS ANYARRAY LANGUAGE SQL
AS $$

View File

@ -2,6 +2,7 @@
-- MULTI_JSON_AGG
--
SET citus.next_shard_id TO 520000;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE OR REPLACE FUNCTION array_sort (json)
RETURNS json LANGUAGE SQL
AS $$

View File

@ -2,6 +2,7 @@
-- MULTI_JSON_OBJECT_AGG
--
SET citus.next_shard_id TO 520000;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE OR REPLACE FUNCTION count_keys (json)
RETURNS bigint LANGUAGE SQL
AS $$

View File

@ -2,6 +2,7 @@
-- MULTI_JSONB_AGG
--
SET citus.next_shard_id TO 520000;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE OR REPLACE FUNCTION array_sort (jsonb)
RETURNS jsonb LANGUAGE SQL
AS $$

View File

@ -2,6 +2,7 @@
-- MULTI_JSONB_OBJECT_AGG
--
SET citus.next_shard_id TO 520000;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE OR REPLACE FUNCTION count_keys (jsonb)
RETURNS bigint LANGUAGE SQL
AS $$

View File

@ -3,6 +3,7 @@ SET citus.next_shard_id TO 850000;
-- router planner, so we're explicitly disabling it in this file.
-- We've bunch of other tests that triggers fast-path-router
SET citus.enable_fast_path_router_planner TO false;
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- ===================================================================
-- test end-to-end query functionality
-- ===================================================================

View File

@ -3,6 +3,7 @@ SET citus.next_shard_id TO 850000;
-- router planner, so we're explicitly disabling it in this file.
-- We've bunch of other tests that triggers fast-path-router
SET citus.enable_fast_path_router_planner TO false;
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- ===================================================================
-- test end-to-end query functionality
-- ===================================================================

View File

@ -3,6 +3,7 @@
--
-- no need to set shardid sequence given that we're not creating any shards
SET citus.next_shard_id TO 570032;
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- Check that we error out if shard min/max values are not exactly same.
SELECT
avg(unit_price)

View File

@ -1,5 +1,6 @@
CREATE SCHEMA recursive_union;
SET search_path TO recursive_union, public;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE TABLE recursive_union.test (x int, y int);
SELECT create_distributed_table('test', 'x');
create_distributed_table

View File

@ -3,6 +3,7 @@
-- ===================================================================
CREATE SCHEMA not_supported;
SET search_path TO not_supported, public;
SET citus.coordinator_aggregation_strategy TO 'disabled';
SET client_min_messages TO DEBUG1;
CREATE TABLE users_table_local AS SELECT * FROM users_table;
-- we don't support subqueries with local tables when they are not leaf queries

View File

@ -1,5 +1,6 @@
-- Test the basic CTE functionality and expected error messages
SET search_path TO 'with_basics';
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE TYPE with_basics.xy AS (x int, y int);
-- CTEs in FROM should work
WITH cte AS (

View File

@ -2,6 +2,7 @@
-- MULTI_AGG_DISTINCT
--
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- Create a new range partitioned lineitem table and load data into it
CREATE TABLE lineitem_range (

View File

@ -6,6 +6,7 @@
SET citus.next_shard_id TO 240000;
SET citus.shard_count TO 8;
SET citus.shard_replication_factor TO 1;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE TABLE lineitem_hash (
l_orderkey bigint not null,

View File

@ -1,6 +1,7 @@
--
-- MULTI_AGG_DISTINCT
--
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- Create a new range partitioned lineitem table and load data into it
CREATE TABLE lineitem_range (
l_orderkey bigint not null,

View File

@ -4,6 +4,7 @@
SET citus.next_shard_id TO 240000;
SET citus.shard_count TO 8;
SET citus.shard_replication_factor TO 1;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE TABLE lineitem_hash (
l_orderkey bigint not null,
l_partkey integer not null,

View File

@ -5,6 +5,7 @@
create schema aggregate_support;
set search_path to aggregate_support;
set citus.coordinator_aggregation_strategy to 'disabled';
-- We test with & without STRICT as our code is responsible for managing these NULL checks
create function sum2_sfunc_strict(state int, x int)
@ -63,6 +64,9 @@ select id, sum2(distinct val), sum2_strict(distinct val) from aggdata group by i
select key, sum2(val order by valf), sum2_strict(val order by valf) from aggdata group by key order by key;
-- Test handling a lack of intermediate results
select sum2(val), sum2_strict(val) from aggdata where valf = 0;
-- Test HAVING
select key, stddev(valf) from aggdata group by key having stddev(valf) > 2 order by key;
select key, stddev(valf) from aggdata group by key having stddev(val::float8) > 1 order by key;
-- test polymorphic aggregates from https://github.com/citusdata/citus/issues/2397
@ -154,5 +158,41 @@ set role notsuper;
select array_collect_sort(val) from aggdata;
reset role;
-- Test aggregation on coordinator
set citus.coordinator_aggregation_strategy to 'row-gather';
select key, first(val order by id), last(val order by id)
from aggdata group by key order by key;
select key, sum2(distinct val), sum2_strict(distinct val) from aggdata group by key order by key;
select key, sum2(val order by valf), sum2_strict(val order by valf) from aggdata group by key order by key;
select string_agg(distinct floor(val/2)::text, '|' order by floor(val/2)::text) from aggdata;
select string_agg(distinct floor(val/2)::text, '|' order by floor(val/2)::text) filter (where val < 5) from aggdata;
select mode() within group (order by floor(val/2)) from aggdata;
select percentile_cont(0.5) within group(order by valf) from aggdata;
select key, percentile_cont(key/10.0) within group(order by val) from aggdata group by key;
select floor(val/2), corr(valf, valf + val) from aggdata group by floor(val/2) order by 1;
select floor(val/2), corr(valf, valf + val) from aggdata group by floor(val/2) having corr(valf + val, val) < 1 order by 1;
select array_agg(val order by valf) from aggdata;
-- Test TransformSubqueryNode
SET citus.task_executor_type to "task-tracker";
select * FROM (
SELECT key, mode() within group (order by floor(agg1.val/2)) m from aggdata agg1
group by key
) subq ORDER BY 2, 1 LIMIT 5;
select * FROM (
SELECT key, avg(distinct floor(agg1.val/2)) m from aggdata agg1
group by key
) subq;
RESET citus.task_executor_type;
-- This fails due to table types not being managed properly
select key, count(distinct aggdata)
from aggdata group by key order by 1, 2;
set client_min_messages to error;
drop schema aggregate_support cascade;

View File

@ -12,6 +12,7 @@ WHERE name = 'hll'
:create_cmd;
SET citus.shard_count TO 4;
set citus.coordinator_aggregation_strategy to 'disabled';
CREATE TABLE raw_table (day date, user_id int);
CREATE TABLE daily_uniques(day date, unique_users hll);

View File

@ -13,6 +13,8 @@ WHERE name = 'hll'
:create_cmd;
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- Try to execute count(distinct) when approximate distincts aren't enabled
SELECT count(distinct l_orderkey) FROM lineitem;

View File

@ -4,6 +4,7 @@
SET citus.next_shard_id TO 520000;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE OR REPLACE FUNCTION array_sort (ANYARRAY)
RETURNS ANYARRAY LANGUAGE SQL

View File

@ -4,6 +4,7 @@
SET citus.next_shard_id TO 520000;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE OR REPLACE FUNCTION array_sort (json)
RETURNS json LANGUAGE SQL

View File

@ -4,6 +4,7 @@
SET citus.next_shard_id TO 520000;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE OR REPLACE FUNCTION count_keys (json)
RETURNS bigint LANGUAGE SQL

View File

@ -4,6 +4,7 @@
SET citus.next_shard_id TO 520000;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE OR REPLACE FUNCTION array_sort (jsonb)
RETURNS jsonb LANGUAGE SQL

View File

@ -4,6 +4,7 @@
SET citus.next_shard_id TO 520000;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE OR REPLACE FUNCTION count_keys (jsonb)
RETURNS bigint LANGUAGE SQL

View File

@ -5,6 +5,7 @@ SET citus.next_shard_id TO 850000;
-- router planner, so we're explicitly disabling it in this file.
-- We've bunch of other tests that triggers fast-path-router
SET citus.enable_fast_path_router_planner TO false;
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- ===================================================================
-- test end-to-end query functionality

View File

@ -4,6 +4,7 @@
-- no need to set shardid sequence given that we're not creating any shards
SET citus.next_shard_id TO 570032;
SET citus.coordinator_aggregation_strategy TO 'disabled';
-- Check that we error out if shard min/max values are not exactly same.
SELECT

View File

@ -1,5 +1,6 @@
CREATE SCHEMA recursive_union;
SET search_path TO recursive_union, public;
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE TABLE recursive_union.test (x int, y int);
SELECT create_distributed_table('test', 'x');

View File

@ -3,6 +3,7 @@
-- ===================================================================
CREATE SCHEMA not_supported;
SET search_path TO not_supported, public;
SET citus.coordinator_aggregation_strategy TO 'disabled';
SET client_min_messages TO DEBUG1;

View File

@ -1,5 +1,6 @@
-- Test the basic CTE functionality and expected error messages
SET search_path TO 'with_basics';
SET citus.coordinator_aggregation_strategy TO 'disabled';
CREATE TYPE with_basics.xy AS (x int, y int);
-- CTEs in FROM should work