Feature: tdigest aggregate (#3897)

DESCRIPTION: Adds support to partially push down tdigest aggregates

tdigest extensions: https://github.com/tvondra/tdigest

This PR implements the partial pushdown of tdigest calculations when possible. The extension adds a tdigest type which can be combined into the same structure. There are several aggregate functions that can be used to get;
 - a quantile
 - a list of quantiles
 - the quantile of a hypothetical value
 - a list of quantiles for a list of hypothetical values

These function can work both on values or tdigest types.

Since we can create tdigest values either by combining them, or based on a group of values we can rewrite the aggregates in such a way that most of the computation gets delegated to the compute on the shards. This both speeds up the percentile calculations because the values don't have to be sorted while at the same time making the transfer size from the shards to the coordinator significantly less.

(cherry picked from commit da8f2b0134)
pull/3929/head
Nils Dijk 2020-06-12 13:50:28 +02:00
parent ea4549a4c0
commit 4ce6c9d8b9
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
10 changed files with 1604 additions and 9 deletions

View File

@ -2420,13 +2420,13 @@ CurrentUserName(void)
/*
* LookupTypeOid returns the Oid of the "pg_catalog.{typeNameString}" type, or
* LookupTypeOid returns the Oid of the "{schemaNameSting}.{typeNameString}" type, or
* InvalidOid if it does not exist.
*/
static Oid
LookupTypeOid(char *typeNameString)
Oid
LookupTypeOid(char *schemaNameSting, char *typeNameString)
{
Value *schemaName = makeString("pg_catalog");
Value *schemaName = makeString(schemaNameSting);
Value *typeName = makeString(typeNameString);
List *qualifiedName = list_make2(schemaName, typeName);
TypeName *enumTypeName = makeTypeNameFromNameList(qualifiedName);
@ -2458,7 +2458,7 @@ LookupTypeOid(char *typeNameString)
static Oid
LookupStringEnumValueId(char *enumName, char *valueName)
{
Oid enumTypeId = LookupTypeOid(enumName);
Oid enumTypeId = LookupTypeOid("pg_catalog", enumName);
if (enumTypeId == InvalidOid)
{

View File

@ -36,6 +36,7 @@
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/tdigest_extension.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
#include "nodes/makefuncs.h"
@ -58,6 +59,8 @@
#include "utils/rel.h"
#include "utils/syscache.h"
#define StartsWith(msg, prefix) \
(strncmp(msg, prefix, strlen(prefix)) == 0)
/* Config variable managed via guc.c */
int LimitClauseRowFetchCount = -1; /* number of rows to fetch from each task */
@ -1935,6 +1938,131 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterExpression = (Expr *) unionAggregate;
}
else if (aggregateType == AGGREGATE_TDIGEST_COMBINE ||
aggregateType == AGGREGATE_TDIGEST_ADD_DOUBLE)
{
/* tdigest of column */
Oid tdigestType = TDigestExtensionTypeOid(); /* tdigest type */
Oid unionFunctionId = TDigestExtensionAggTDigest1();
int32 tdigestReturnTypeMod = exprTypmod((Node *) originalAggregate);
Oid tdigestTypeCollationId = exprCollation((Node *) originalAggregate);
/* create first argument for tdigest_precentile(tdigest, double) */
Var *tdigestColumn = makeVar(masterTableId, walkerContext->columnId, tdigestType,
tdigestReturnTypeMod, tdigestTypeCollationId,
columnLevelsUp);
TargetEntry *tdigestTargetEntry = makeTargetEntry((Expr *) tdigestColumn,
argumentId,
NULL, false);
walkerContext->columnId++;
/* construct the master tdigest(tdigest) expression */
Aggref *unionAggregate = makeNode(Aggref);
unionAggregate->aggfnoid = unionFunctionId;
unionAggregate->aggtype = originalAggregate->aggtype;
unionAggregate->args = list_make1(tdigestTargetEntry);
unionAggregate->aggkind = AGGKIND_NORMAL;
unionAggregate->aggfilter = NULL;
unionAggregate->aggtranstype = InvalidOid;
unionAggregate->aggargtypes = list_make1_oid(tdigestType);
unionAggregate->aggsplit = AGGSPLIT_SIMPLE;
newMasterExpression = (Expr *) unionAggregate;
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLEARRAY ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLEARRAY)
{
/* tdigest of column */
Oid tdigestType = TDigestExtensionTypeOid();
Oid unionFunctionId = InvalidOid;
if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLE)
{
unionFunctionId = TDigestExtensionAggTDigestPercentile2();
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLEARRAY)
{
unionFunctionId = TDigestExtensionAggTDigestPercentile2a();
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLE)
{
unionFunctionId = TDigestExtensionAggTDigestPercentileOf2();
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLEARRAY)
{
unionFunctionId = TDigestExtensionAggTDigestPercentileOf2a();
}
Assert(OidIsValid(unionFunctionId));
int32 tdigestReturnTypeMod = exprTypmod((Node *) originalAggregate);
Oid tdigestTypeCollationId = exprCollation((Node *) originalAggregate);
/* create first argument for tdigest_precentile(tdigest, double) */
Var *tdigestColumn = makeVar(masterTableId, walkerContext->columnId, tdigestType,
tdigestReturnTypeMod, tdigestTypeCollationId,
columnLevelsUp);
TargetEntry *tdigestTargetEntry = makeTargetEntry((Expr *) tdigestColumn,
argumentId, NULL, false);
walkerContext->columnId++;
/* construct the master tdigest_precentile(tdigest, double) expression */
Aggref *unionAggregate = makeNode(Aggref);
unionAggregate->aggfnoid = unionFunctionId;
unionAggregate->aggtype = originalAggregate->aggtype;
unionAggregate->args = list_make2(
tdigestTargetEntry,
list_nth(originalAggregate->args, 2));
unionAggregate->aggkind = AGGKIND_NORMAL;
unionAggregate->aggfilter = NULL;
unionAggregate->aggtranstype = InvalidOid;
unionAggregate->aggargtypes = list_make2_oid(
tdigestType,
list_nth_oid(originalAggregate->aggargtypes, 2));
unionAggregate->aggsplit = AGGSPLIT_SIMPLE;
newMasterExpression = (Expr *) unionAggregate;
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLEARRAY)
{
/* tdigest of column */
Oid tdigestType = TDigestExtensionTypeOid();
/* These functions already will combine the tdigest arguments returned */
Oid unionFunctionId = originalAggregate->aggfnoid;
int32 tdigestReturnTypeMod = exprTypmod((Node *) originalAggregate);
Oid tdigestTypeCollationId = exprCollation((Node *) originalAggregate);
/* create first argument for tdigest_precentile(tdigest, double) */
Var *tdigestColumn = makeVar(masterTableId, walkerContext->columnId, tdigestType,
tdigestReturnTypeMod, tdigestTypeCollationId,
columnLevelsUp);
TargetEntry *tdigestTargetEntry = makeTargetEntry((Expr *) tdigestColumn,
argumentId, NULL, false);
walkerContext->columnId++;
/* construct the master tdigest_precentile(tdigest, double) expression */
Aggref *unionAggregate = makeNode(Aggref);
unionAggregate->aggfnoid = unionFunctionId;
unionAggregate->aggtype = originalAggregate->aggtype;
unionAggregate->args = list_make2(
tdigestTargetEntry,
list_nth(originalAggregate->args, 1));
unionAggregate->aggkind = AGGKIND_NORMAL;
unionAggregate->aggfilter = NULL;
unionAggregate->aggtranstype = InvalidOid;
unionAggregate->aggargtypes = list_make2_oid(
tdigestType,
list_nth_oid(originalAggregate->aggargtypes, 1));
unionAggregate->aggsplit = AGGSPLIT_SIMPLE;
newMasterExpression = (Expr *) unionAggregate;
}
else if (aggregateType == AGGREGATE_CUSTOM_COMBINE)
{
HeapTuple aggTuple =
@ -3048,6 +3176,71 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
workerAggregateList = lappend(workerAggregateList, sumAggregate);
workerAggregateList = lappend(workerAggregateList, countAggregate);
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLEARRAY ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLEARRAY)
{
/*
* The original query has an aggregate in the form of either
* - tdigest_percentile(column, compression, quantile)
* - tdigest_percentile(column, compression, quantile[])
* - tdigest_percentile_of(column, compression, value)
* - tdigest_percentile_of(column, compression, value[])
*
* We are creating the worker part of this query by creating a
* - tdigest(column, compression)
*
* One could see we are passing argument 0 and argument 1 from the original query
* in here. This corresponds with the list_nth calls in the args and aggargstypes
* list construction. The tdigest function and type are read from the catalog.
*/
Aggref *newWorkerAggregate = copyObject(originalAggregate);
newWorkerAggregate->aggfnoid = TDigestExtensionAggTDigest2();
newWorkerAggregate->aggtype = TDigestExtensionTypeOid();
newWorkerAggregate->args = list_make2(
list_nth(newWorkerAggregate->args, 0),
list_nth(newWorkerAggregate->args, 1));
newWorkerAggregate->aggkind = AGGKIND_NORMAL;
newWorkerAggregate->aggtranstype = InvalidOid;
newWorkerAggregate->aggargtypes = list_make2_oid(
list_nth_oid(newWorkerAggregate->aggargtypes, 0),
list_nth_oid(newWorkerAggregate->aggargtypes, 1));
newWorkerAggregate->aggsplit = AGGSPLIT_SIMPLE;
workerAggregateList = lappend(workerAggregateList, newWorkerAggregate);
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLEARRAY)
{
/*
* The original query has an aggregate in the form of either
* - tdigest_percentile(tdigest, quantile)
* - tdigest_percentile(tdigest, quantile[])
* - tdigest_percentile_of(tdigest, value)
* - tdigest_percentile_of(tdigest, value[])
*
* We are creating the worker part of this query by creating a
* - tdigest(tdigest)
*
* One could see we are passing argument 0 from the original query in here. This
* corresponds with the list_nth calls in the args and aggargstypes list
* construction. The tdigest function and type are read from the catalog.
*/
Aggref *newWorkerAggregate = copyObject(originalAggregate);
newWorkerAggregate->aggfnoid = TDigestExtensionAggTDigest1();
newWorkerAggregate->aggtype = TDigestExtensionTypeOid();
newWorkerAggregate->args = list_make1(list_nth(newWorkerAggregate->args, 0));
newWorkerAggregate->aggkind = AGGKIND_NORMAL;
newWorkerAggregate->aggtranstype = InvalidOid;
newWorkerAggregate->aggargtypes = list_make1_oid(
list_nth_oid(newWorkerAggregate->aggargtypes, 0));
newWorkerAggregate->aggsplit = AGGSPLIT_SIMPLE;
workerAggregateList = lappend(workerAggregateList, newWorkerAggregate);
}
else if (aggregateType == AGGREGATE_CUSTOM_COMBINE)
{
HeapTuple aggTuple =
@ -3155,6 +3348,66 @@ GetAggregateType(Aggref *aggregateExpression)
}
}
/*
* All functions from github.com/tvondra/tdigest start with the "tdigest" prefix.
* Since it requires lookups of function names in a schema we would like to only
* perform these checks if there is some chance it will actually result in a positive
* hit.
*/
if (StartsWith(aggregateProcName, "tdigest"))
{
if (aggFunctionId == TDigestExtensionAggTDigest1())
{
return AGGREGATE_TDIGEST_COMBINE;
}
if (aggFunctionId == TDigestExtensionAggTDigest2())
{
return AGGREGATE_TDIGEST_ADD_DOUBLE;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentile3())
{
return AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLE;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentile3a())
{
return AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLEARRAY;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentile2())
{
return AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLE;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentile2a())
{
return AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf3())
{
return AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLE;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf3a())
{
return AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLEARRAY;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf2())
{
return AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf2a())
{
return AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLEARRAY;
}
}
if (AggregateEnabledCustom(aggregateExpression))
{
return AGGREGATE_CUSTOM_COMBINE;

View File

@ -0,0 +1,248 @@
/*-------------------------------------------------------------------------
*
* tdigest_extension.c
* Helper functions to get access to tdigest specific data.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/htup_details.h"
#include "catalog/pg_extension.h"
#include "catalog/pg_type.h"
#include "distributed/metadata_cache.h"
#include "distributed/tdigest_extension.h"
#include "parser/parse_func.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
static Oid LookupTDigestFunction(const char *functionName, int argcount, Oid *argtypes);
/*
* TDigestExtensionSchema finds the schema the tdigest extension is installed in. The
* function will return InvalidOid if the extension is not installed.
*/
Oid
TDigestExtensionSchema()
{
ScanKeyData entry[1];
Form_pg_extension extensionForm = NULL;
Oid tdigestExtensionSchema = InvalidOid;
Relation relation = heap_open(ExtensionRelationId, AccessShareLock);
ScanKeyInit(&entry[0],
Anum_pg_extension_extname,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum("tdigest"));
SysScanDesc scandesc = systable_beginscan(relation, ExtensionNameIndexId, true,
NULL, 1, entry);
HeapTuple extensionTuple = systable_getnext(scandesc);
/*
* We assume that there can be at most one matching tuple, if no tuple found the
* extension is not installed. The value of InvalidOid will not be changed.
*/
if (HeapTupleIsValid(extensionTuple))
{
extensionForm = (Form_pg_extension) GETSTRUCT(extensionTuple);
tdigestExtensionSchema = extensionForm->extnamespace;
Assert(OidIsValid(tdigestExtensionSchema));
}
systable_endscan(scandesc);
heap_close(relation, AccessShareLock);
return tdigestExtensionSchema;
}
/*
* TDigestExtensionTypeOid performs a lookup for the Oid of the type representing the
* tdigest as installed by the tdigest extension returns InvalidOid if the type cannot be
* found.
*/
Oid
TDigestExtensionTypeOid()
{
Oid tdigestSchemaOid = TDigestExtensionSchema();
if (!OidIsValid(tdigestSchemaOid))
{
return InvalidOid;
}
char *namespaceName = get_namespace_name(tdigestSchemaOid);
return LookupTypeOid(namespaceName, "tdigest");
}
/*
* LookupTDigestFunction is a helper function specifically to lookup functions in the
* namespace/schema where the tdigest extension is installed. This makes the lookup of
* following aggregate functions easier and less repetitive.
*/
static Oid
LookupTDigestFunction(const char *functionName, int argcount, Oid *argtypes)
{
Oid tdigestSchemaOid = TDigestExtensionSchema();
if (!OidIsValid(tdigestSchemaOid))
{
return InvalidOid;
}
char *namespaceName = get_namespace_name(tdigestSchemaOid);
return LookupFuncName(
list_make2(makeString(namespaceName), makeString(pstrdup(functionName))),
argcount, argtypes, true);
}
/*
* TDigestExtensionAggTDigest1 performs a lookup for the Oid of the tdigest aggregate;
* tdigest(tdigest)
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigest1()
{
return LookupTDigestFunction("tdigest", 1, (Oid[]) { TDigestExtensionTypeOid() });
}
/*
* TDigestExtensionAggTDigest2 performs a lookup for the Oid of the tdigest aggregate;
* tdigest(value double precision, compression int)
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigest2()
{
return LookupTDigestFunction("tdigest", 2, (Oid[]) { FLOAT8OID, INT4OID });
}
/*
* TDigestExtensionAggTDigestPercentile2 performs a lookup for the Oid of the tdigest
* aggregate;
* tdigest_percentile(tdigest, double precision)
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigestPercentile2()
{
return LookupTDigestFunction("tdigest_percentile", 2,
(Oid[]) { TDigestExtensionTypeOid(), FLOAT8OID });
}
/*
* TDigestExtensionAggTDigestPercentile2a performs a lookup for the Oid of the tdigest
* aggregate;
* tdigest_percentile(tdigest, double precision[])
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigestPercentile2a(void)
{
return LookupTDigestFunction("tdigest_percentile", 2,
(Oid[]) { TDigestExtensionTypeOid(), FLOAT8ARRAYOID });
}
/*
* TDigestExtensionAggTDigestPercentile3 performs a lookup for the Oid of the tdigest
* aggregate;
* tdigest_percentile(double precision, int, double precision)
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigestPercentile3()
{
return LookupTDigestFunction("tdigest_percentile", 3,
(Oid[]) { FLOAT8OID, INT4OID, FLOAT8OID });
}
/*
* TDigestExtensionAggTDigestPercentile3a performs a lookup for the Oid of the tdigest
* aggregate;
* tdigest_percentile(double precision, int, double precision[])
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigestPercentile3a(void)
{
return LookupTDigestFunction("tdigest_percentile", 3,
(Oid[]) { FLOAT8OID, INT4OID, FLOAT8ARRAYOID });
}
/*
* TDigestExtensionAggTDigestPercentileOf2 performs a lookup for the Oid of the tdigest
* aggregate;
* tdigest_percentile_of(tdigest, double precision)
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigestPercentileOf2()
{
return LookupTDigestFunction("tdigest_percentile_of", 2,
(Oid[]) { TDigestExtensionTypeOid(), FLOAT8OID });
}
/*
* TDigestExtensionAggTDigestPercentileOf2a performs a lookup for the Oid of the tdigest
* aggregate;
* tdigest_percentile_of(tdigest, double precision[])
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigestPercentileOf2a(void)
{
return LookupTDigestFunction("tdigest_percentile_of", 2,
(Oid[]) { TDigestExtensionTypeOid(), FLOAT8ARRAYOID });
}
/*
* TDigestExtensionAggTDigestPercentileOf3 performs a lookup for the Oid of the tdigest
* aggregate;
* tdigest_percentile_of(double precision, int, double precision)
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigestPercentileOf3()
{
return LookupTDigestFunction("tdigest_percentile_of", 3,
(Oid[]) { FLOAT8OID, INT4OID, FLOAT8OID });
}
/*
* TDigestExtensionAggTDigestPercentileOf3a performs a lookup for the Oid of the tdigest
* aggregate;
* tdigest_percentile_of(double precision, int, double precision[])
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigestPercentileOf3a(void)
{
return LookupTDigestFunction("tdigest_percentile_of", 3,
(Oid[]) { FLOAT8OID, INT4OID, FLOAT8ARRAYOID });
}

View File

@ -194,6 +194,7 @@ extern Oid DistPlacementGroupidIndexId(void);
extern Oid DistObjectPrimaryKeyIndexId(void);
/* type oids */
extern Oid LookupTypeOid(char *schemaNameSting, char *typeNameString);
extern Oid CitusCopyFormatTypeId(void);
/* function oids */
@ -220,5 +221,4 @@ extern char * CitusExtensionOwnerName(void);
extern char * CurrentUserName(void);
extern const char * CurrentDatabaseName(void);
#endif /* METADATA_CACHE_H */

View File

@ -80,9 +80,21 @@ typedef enum
AGGREGATE_TOPN_UNION_AGG = 19,
AGGREGATE_ANY_VALUE = 20,
/* support for github.com/tvondra/tdigest */
AGGREGATE_TDIGEST_COMBINE = 21,
AGGREGATE_TDIGEST_ADD_DOUBLE = 22,
AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLE = 23,
AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLEARRAY = 24,
AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLE = 25,
AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY = 26,
AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLE = 27,
AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLEARRAY = 28,
AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE = 29,
AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLEARRAY = 30,
/* AGGREGATE_CUSTOM must come last */
AGGREGATE_CUSTOM_COMBINE = 21,
AGGREGATE_CUSTOM_ROW_GATHER = 22,
AGGREGATE_CUSTOM_COMBINE = 31,
AGGREGATE_CUSTOM_ROW_GATHER = 32,
} AggregateType;

View File

@ -0,0 +1,27 @@
/*-------------------------------------------------------------------------
*
* tdigest_extension.c
* Helper functions to get access to tdigest specific data.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef CITUS_TDIGEST_EXTENSION_H
#define CITUS_TDIGEST_EXTENSION_H
/* tdigest related functions */
extern Oid TDigestExtensionSchema(void);
extern Oid TDigestExtensionTypeOid(void);
extern Oid TDigestExtensionAggTDigest1(void);
extern Oid TDigestExtensionAggTDigest2(void);
extern Oid TDigestExtensionAggTDigestPercentile2(void);
extern Oid TDigestExtensionAggTDigestPercentile2a(void);
extern Oid TDigestExtensionAggTDigestPercentile3(void);
extern Oid TDigestExtensionAggTDigestPercentile3a(void);
extern Oid TDigestExtensionAggTDigestPercentileOf2(void);
extern Oid TDigestExtensionAggTDigestPercentileOf2a(void);
extern Oid TDigestExtensionAggTDigestPercentileOf3(void);
extern Oid TDigestExtensionAggTDigestPercentileOf3a(void);
#endif /* CITUS_TDIGEST_EXTENSION_H */

View File

@ -0,0 +1,626 @@
--
-- TDIGEST_AGGREGATE_SUPPORT
-- test the integration of github.com/tvondra/tdigest aggregates into the citus planner
-- for push down parts of the aggregate to use parallelized execution and reduced data
-- transfer sizes for aggregates not grouped by the distribution column
--
SET citus.next_shard_id TO 20070000;
CREATE SCHEMA tdigest_aggregate_support;
SET search_path TO tdigest_aggregate_support, public;
-- create the tdigest extension when installed
SELECT CASE WHEN COUNT(*) > 0
THEN 'CREATE EXTENSION tdigest WITH SCHEMA public'
ELSE 'SELECT false AS tdigest_present' END
AS create_cmd FROM pg_available_extensions()
WHERE name = 'tdigest'
\gset
:create_cmd;
SET citus.shard_count TO 4;
SET citus.coordinator_aggregation_strategy TO 'disabled'; -- prevent aggregate execution when the aggregate can't be pushed down
CREATE TABLE latencies (a int, b int, latency double precision);
SELECT create_distributed_table('latencies', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT setseed(0.42); -- make the random data inserted deterministic
setseed
---------------------------------------------------------------------
(1 row)
INSERT INTO latencies
SELECT (random()*20)::int AS a,
(random()*20)::int AS b,
random()*10000.0 AS latency
FROM generate_series(1, 10000);
-- explain no grouping to verify partially pushed down for tdigest(value, compression)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest(latency, 100)
FROM latencies;
QUERY PLAN
---------------------------------------------------------------------
Aggregate
Output: tdigest(remote_scan.tdigest)
-> Custom Scan (Citus Adaptive)
Output: remote_scan.tdigest
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
Output: tdigest(latency, 100)
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(12 rows)
-- explain grouping by distribution column is completely pushed down for tdigest(value, compression)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest(latency, 100)
FROM latencies
GROUP BY a;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.a, remote_scan.tdigest
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: a, tdigest(latency, 100)
Group Key: latencies.a
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(11 rows)
-- explain grouping by non-distribution column is partially pushed down for tdigest(value, compression)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest(latency, 100)
FROM latencies
GROUP BY b;
QUERY PLAN
---------------------------------------------------------------------
HashAggregate
Output: remote_scan.b, tdigest(remote_scan.tdigest)
Group Key: remote_scan.b
-> Custom Scan (Citus Adaptive)
Output: remote_scan.b, remote_scan.tdigest
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: b, tdigest(latency, 100)
Group Key: latencies.b
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(14 rows)
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile(latency, 100, 0.99)
FROM latencies;
QUERY PLAN
---------------------------------------------------------------------
Aggregate
Output: tdigest_percentile(remote_scan.tdigest_percentile, '0.99'::double precision)
-> Custom Scan (Citus Adaptive)
Output: remote_scan.tdigest_percentile
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
Output: tdigest(latency, 100)
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(12 rows)
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile(latency, 100, 0.99)
FROM latencies
GROUP BY a;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.a, remote_scan.tdigest_percentile
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: a, tdigest_percentile(latency, 100, '0.99'::double precision)
Group Key: latencies.a
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(11 rows)
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile(value, compression, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest_percentile(latency, 100, 0.99)
FROM latencies
GROUP BY b;
QUERY PLAN
---------------------------------------------------------------------
HashAggregate
Output: remote_scan.b, tdigest_percentile(remote_scan.tdigest_percentile, '0.99'::double precision)
Group Key: remote_scan.b
-> Custom Scan (Citus Adaptive)
Output: remote_scan.b, remote_scan.tdigest_percentile
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: b, tdigest(latency, 100)
Group Key: latencies.b
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(14 rows)
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
FROM latencies;
QUERY PLAN
---------------------------------------------------------------------
Aggregate
Output: tdigest_percentile(remote_scan.tdigest_percentile, '{0.99,0.95}'::double precision[])
-> Custom Scan (Citus Adaptive)
Output: remote_scan.tdigest_percentile
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
Output: tdigest(latency, 100)
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(12 rows)
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
FROM latencies
GROUP BY a;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.a, remote_scan.tdigest_percentile
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: a, tdigest_percentile(latency, 100, '{0.99,0.95}'::double precision[])
Group Key: latencies.a
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(11 rows)
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
FROM latencies
GROUP BY b;
QUERY PLAN
---------------------------------------------------------------------
HashAggregate
Output: remote_scan.b, tdigest_percentile(remote_scan.tdigest_percentile, '{0.99,0.95}'::double precision[])
Group Key: remote_scan.b
-> Custom Scan (Citus Adaptive)
Output: remote_scan.b, remote_scan.tdigest_percentile
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: b, tdigest(latency, 100)
Group Key: latencies.b
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(14 rows)
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile_of(latency, 100, 9000)
FROM latencies;
QUERY PLAN
---------------------------------------------------------------------
Aggregate
Output: tdigest_percentile_of(remote_scan.tdigest_percentile_of, '9000'::double precision)
-> Custom Scan (Citus Adaptive)
Output: remote_scan.tdigest_percentile_of
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
Output: tdigest(latency, 100)
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(12 rows)
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile_of(latency, 100, 9000)
FROM latencies
GROUP BY a;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.a, remote_scan.tdigest_percentile_of
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: a, tdigest_percentile_of(latency, 100, '9000'::double precision)
Group Key: latencies.a
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(11 rows)
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest_percentile_of(latency, 100, 9000)
FROM latencies
GROUP BY b;
QUERY PLAN
---------------------------------------------------------------------
HashAggregate
Output: remote_scan.b, tdigest_percentile_of(remote_scan.tdigest_percentile_of, '9000'::double precision)
Group Key: remote_scan.b
-> Custom Scan (Citus Adaptive)
Output: remote_scan.b, remote_scan.tdigest_percentile_of
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: b, tdigest(latency, 100)
Group Key: latencies.b
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(14 rows)
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
FROM latencies;
QUERY PLAN
---------------------------------------------------------------------
Aggregate
Output: tdigest_percentile_of(remote_scan.tdigest_percentile_of, '{9000,9500}'::double precision[])
-> Custom Scan (Citus Adaptive)
Output: remote_scan.tdigest_percentile_of
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
Output: tdigest(latency, 100)
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(12 rows)
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
FROM latencies
GROUP BY a;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.a, remote_scan.tdigest_percentile_of
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: a, tdigest_percentile_of(latency, 100, '{9000,9500}'::double precision[])
Group Key: latencies.a
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(11 rows)
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
FROM latencies
GROUP BY b;
QUERY PLAN
---------------------------------------------------------------------
HashAggregate
Output: remote_scan.b, tdigest_percentile_of(remote_scan.tdigest_percentile_of, '{9000,9500}'::double precision[])
Group Key: remote_scan.b
-> Custom Scan (Citus Adaptive)
Output: remote_scan.b, remote_scan.tdigest_percentile_of
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: b, tdigest(latency, 100)
Group Key: latencies.b
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
Output: a, b, latency
(14 rows)
-- verifying results - should be stable due to seed while inserting the data, if failure due to data these queries could be removed or check for certain ranges
SELECT tdigest(latency, 100) FROM latencies;
tdigest
---------------------------------------------------------------------
flags 0 count 10000 compression 100 centroids 46 (0.287235, 1) (1.025106, 1) (2.058216, 1) (5.335597, 1) (12.707263, 2) (25.302479, 3) (43.435063, 4) (77.987860, 5) (269.478664, 10) (509.417419, 13) (1227.158879, 22) (3408.256171, 35) (7772.721988, 55) (13840.275516, 65) (32937.127607, 108) (64476.403332, 148) (118260.230644, 199) (239584.293240, 292) (562119.836766, 463) (944722.686313, 547) (1751089.620493, 749) (3751264.745959, 1128) (5877270.108576, 1300) (6224557.402567, 1104) (5804999.258033, 874) (5632316.697114, 755) (4648651.050740, 573) (3460055.227950, 402) (2820271.404686, 314) (2676501.012955, 288) (1649845.166017, 173) (1269335.942008, 131) (813964.853243, 83) (484144.878702, 49) (337179.763016, 34) (198775.241901, 20) (149353.499704, 15) (109688.319223, 11) (79855.926155, 8) (49937.731689, 5) (29971.046175, 3) (19982.538737, 2) (9991.467422, 1) (9992.337047, 1) (9995.578357, 1) (9999.700339, 1)
(1 row)
SELECT tdigest_percentile(latency, 100, 0.99) FROM latencies;
tdigest_percentile
---------------------------------------------------------------------
9904.28342426494
(1 row)
SELECT tdigest_percentile(latency, 100, ARRAY[0.99, 0.95]) FROM latencies;
tdigest_percentile
---------------------------------------------------------------------
{9904.28342426494,9485.49009399385}
(1 row)
SELECT tdigest_percentile_of(latency, 100, 9000) FROM latencies;
tdigest_percentile_of
---------------------------------------------------------------------
0.903462047211138
(1 row)
SELECT tdigest_percentile_of(latency, 100, ARRAY[9000, 9500]) FROM latencies;
tdigest_percentile_of
---------------------------------------------------------------------
{0.903462047211138,0.95137481812975}
(1 row)
CREATE TABLE latencies_rollup (a int, tdigest tdigest);
SELECT create_distributed_table('latencies_rollup', 'a', colocate_with => 'latencies');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO latencies_rollup
SELECT a, tdigest(latency, 100)
FROM latencies
GROUP BY a;
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest(tdigest)
FROM latencies_rollup;
QUERY PLAN
---------------------------------------------------------------------
Aggregate
Output: tdigest(remote_scan.tdigest)
-> Custom Scan (Citus Adaptive)
Output: remote_scan.tdigest
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
Output: tdigest(tdigest)
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
Output: a, tdigest
(12 rows)
-- explain grouping by distribution column is completely pushed down for tdigest(tdigest)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest(tdigest)
FROM latencies_rollup
GROUP BY a;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.a, remote_scan.tdigest
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: a, tdigest(tdigest)
Group Key: latencies_rollup.a
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
Output: a, tdigest
(11 rows)
-- explain no grouping to verify partially pushed down for tdigest_precentile(tdigest, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile(tdigest, 0.99)
FROM latencies_rollup;
QUERY PLAN
---------------------------------------------------------------------
Aggregate
Output: tdigest_percentile(remote_scan.tdigest_percentile, '0.99'::double precision)
-> Custom Scan (Citus Adaptive)
Output: remote_scan.tdigest_percentile
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
Output: tdigest(tdigest)
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
Output: a, tdigest
(12 rows)
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(tdigest, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile(tdigest, 0.99)
FROM latencies_rollup
GROUP BY a;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.a, remote_scan.tdigest_percentile
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: a, tdigest_percentile(tdigest, '0.99'::double precision)
Group Key: latencies_rollup.a
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
Output: a, tdigest
(11 rows)
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile(tdigest, ARRAY[0.99, 0.95])
FROM latencies_rollup;
QUERY PLAN
---------------------------------------------------------------------
Aggregate
Output: tdigest_percentile(remote_scan.tdigest_percentile, '{0.99,0.95}'::double precision[])
-> Custom Scan (Citus Adaptive)
Output: remote_scan.tdigest_percentile
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
Output: tdigest(tdigest)
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
Output: a, tdigest
(12 rows)
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile(tdigest, ARRAY[0.99, 0.95])
FROM latencies_rollup
GROUP BY a;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.a, remote_scan.tdigest_percentile
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: a, tdigest_percentile(tdigest, '{0.99,0.95}'::double precision[])
Group Key: latencies_rollup.a
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
Output: a, tdigest
(11 rows)
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile_of(tdigest, 9000)
FROM latencies_rollup;
QUERY PLAN
---------------------------------------------------------------------
Aggregate
Output: tdigest_percentile_of(remote_scan.tdigest_percentile_of, '9000'::double precision)
-> Custom Scan (Citus Adaptive)
Output: remote_scan.tdigest_percentile_of
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
Output: tdigest(tdigest)
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
Output: a, tdigest
(12 rows)
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile_of(tdigest, 9000)
FROM latencies_rollup
GROUP BY a;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.a, remote_scan.tdigest_percentile_of
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: a, tdigest_percentile_of(tdigest, '9000'::double precision)
Group Key: latencies_rollup.a
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
Output: a, tdigest
(11 rows)
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile_of(tdigest, ARRAY[9000, 9500])
FROM latencies_rollup;
QUERY PLAN
---------------------------------------------------------------------
Aggregate
Output: tdigest_percentile_of(remote_scan.tdigest_percentile_of, '{9000,9500}'::double precision[])
-> Custom Scan (Citus Adaptive)
Output: remote_scan.tdigest_percentile_of
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate
Output: tdigest(tdigest)
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
Output: a, tdigest
(12 rows)
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile_of(tdigest, ARRAY[9000, 9500])
FROM latencies_rollup
GROUP BY a;
QUERY PLAN
---------------------------------------------------------------------
Custom Scan (Citus Adaptive)
Output: remote_scan.a, remote_scan.tdigest_percentile_of
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate
Output: a, tdigest_percentile_of(tdigest, '{9000,9500}'::double precision[])
Group Key: latencies_rollup.a
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
Output: a, tdigest
(11 rows)
-- verifying results - should be stable due to seed while inserting the data, if failure due to data these queries could be removed or check for certain ranges
SELECT tdigest(tdigest) FROM latencies_rollup;
tdigest
---------------------------------------------------------------------
flags 0 count 10000 compression 100 centroids 47 (0.287235, 1) (1.025106, 1) (2.058216, 1) (5.335597, 1) (12.707263, 2) (25.302479, 3) (43.435063, 4) (77.987860, 5) (241.681030, 9) (402.696604, 11) (999.675875, 20) (2310.848640, 27) (4374.387978, 37) (9722.896547, 56) (21713.805492, 87) (39735.065966, 112) (87335.860853, 177) (182744.906162, 262) (336766.886786, 338) (661263.339724, 464) (1228663.222377, 623) (2146097.038498, 805) (2854487.701653, 827) (5292830.156590, 1195) (6168185.834602, 1104) (6399734.303813, 966) (5778088.854724, 773) (5213381.984997, 637) (3763042.148296, 431) (3036786.646485, 333) (1948238.134602, 207) (1456568.605821, 152) (999888.715345, 103) (715935.892988, 73) (543464.906535, 55) (327339.982973, 33) (198853.838033, 20) (159362.743852, 16) (79807.827301, 8) (69877.414438, 7) (49937.731689, 5) (29971.046175, 3) (19982.538737, 2) (9991.467422, 1) (9992.337047, 1) (9995.578357, 1) (9999.700339, 1)
(1 row)
SELECT tdigest_percentile(tdigest, 0.99) FROM latencies_rollup;
tdigest_percentile
---------------------------------------------------------------------
9903.76070790358
(1 row)
SELECT tdigest_percentile(tdigest, ARRAY[0.99, 0.95]) FROM latencies_rollup;
tdigest_percentile
---------------------------------------------------------------------
{9903.76070790358,9492.7106302226}
(1 row)
SELECT tdigest_percentile_of(tdigest, 9000) FROM latencies_rollup;
tdigest_percentile_of
---------------------------------------------------------------------
0.902852659582396
(1 row)
SELECT tdigest_percentile_of(tdigest, ARRAY[9000, 9500]) FROM latencies_rollup;
tdigest_percentile_of
---------------------------------------------------------------------
{0.902852659582396,0.950865574659141}
(1 row)
SET client_min_messages TO WARNING; -- suppress cascade messages
DROP SCHEMA tdigest_aggregate_support CASCADE;

View File

@ -0,0 +1,234 @@
--
-- TDIGEST_AGGREGATE_SUPPORT
-- test the integration of github.com/tvondra/tdigest aggregates into the citus planner
-- for push down parts of the aggregate to use parallelized execution and reduced data
-- transfer sizes for aggregates not grouped by the distribution column
--
SET citus.next_shard_id TO 20070000;
CREATE SCHEMA tdigest_aggregate_support;
SET search_path TO tdigest_aggregate_support, public;
-- create the tdigest extension when installed
SELECT CASE WHEN COUNT(*) > 0
THEN 'CREATE EXTENSION tdigest WITH SCHEMA public'
ELSE 'SELECT false AS tdigest_present' END
AS create_cmd FROM pg_available_extensions()
WHERE name = 'tdigest'
\gset
:create_cmd;
tdigest_present
---------------------------------------------------------------------
f
(1 row)
SET citus.shard_count TO 4;
SET citus.coordinator_aggregation_strategy TO 'disabled'; -- prevent aggregate execution when the aggregate can't be pushed down
CREATE TABLE latencies (a int, b int, latency double precision);
SELECT create_distributed_table('latencies', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT setseed(0.42); -- make the random data inserted deterministic
setseed
---------------------------------------------------------------------
(1 row)
INSERT INTO latencies
SELECT (random()*20)::int AS a,
(random()*20)::int AS b,
random()*10000.0 AS latency
FROM generate_series(1, 10000);
-- explain no grouping to verify partially pushed down for tdigest(value, compression)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest(latency, 100)
FROM latencies;
ERROR: function tdigest(double precision, integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain grouping by distribution column is completely pushed down for tdigest(value, compression)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest(latency, 100)
FROM latencies
GROUP BY a;
ERROR: function tdigest(double precision, integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain grouping by non-distribution column is partially pushed down for tdigest(value, compression)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest(latency, 100)
FROM latencies
GROUP BY b;
ERROR: function tdigest(double precision, integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile(latency, 100, 0.99)
FROM latencies;
ERROR: function tdigest_percentile(double precision, integer, numeric) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile(latency, 100, 0.99)
FROM latencies
GROUP BY a;
ERROR: function tdigest_percentile(double precision, integer, numeric) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile(value, compression, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest_percentile(latency, 100, 0.99)
FROM latencies
GROUP BY b;
ERROR: function tdigest_percentile(double precision, integer, numeric) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
FROM latencies;
ERROR: function tdigest_percentile(double precision, integer, numeric[]) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
FROM latencies
GROUP BY a;
ERROR: function tdigest_percentile(double precision, integer, numeric[]) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
FROM latencies
GROUP BY b;
ERROR: function tdigest_percentile(double precision, integer, numeric[]) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile_of(latency, 100, 9000)
FROM latencies;
ERROR: function tdigest_percentile_of(double precision, integer, integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile_of(latency, 100, 9000)
FROM latencies
GROUP BY a;
ERROR: function tdigest_percentile_of(double precision, integer, integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest_percentile_of(latency, 100, 9000)
FROM latencies
GROUP BY b;
ERROR: function tdigest_percentile_of(double precision, integer, integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
FROM latencies;
ERROR: function tdigest_percentile_of(double precision, integer, integer[]) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
FROM latencies
GROUP BY a;
ERROR: function tdigest_percentile_of(double precision, integer, integer[]) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
FROM latencies
GROUP BY b;
ERROR: function tdigest_percentile_of(double precision, integer, integer[]) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
-- verifying results - should be stable due to seed while inserting the data, if failure due to data these queries could be removed or check for certain ranges
SELECT tdigest(latency, 100) FROM latencies;
ERROR: function tdigest(double precision, integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
SELECT tdigest_percentile(latency, 100, 0.99) FROM latencies;
ERROR: function tdigest_percentile(double precision, integer, numeric) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
SELECT tdigest_percentile(latency, 100, ARRAY[0.99, 0.95]) FROM latencies;
ERROR: function tdigest_percentile(double precision, integer, numeric[]) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
SELECT tdigest_percentile_of(latency, 100, 9000) FROM latencies;
ERROR: function tdigest_percentile_of(double precision, integer, integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
SELECT tdigest_percentile_of(latency, 100, ARRAY[9000, 9500]) FROM latencies;
ERROR: function tdigest_percentile_of(double precision, integer, integer[]) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
CREATE TABLE latencies_rollup (a int, tdigest tdigest);
ERROR: type "tdigest" does not exist
SELECT create_distributed_table('latencies_rollup', 'a', colocate_with => 'latencies');
ERROR: relation "latencies_rollup" does not exist
INSERT INTO latencies_rollup
SELECT a, tdigest(latency, 100)
FROM latencies
GROUP BY a;
ERROR: relation "latencies_rollup" does not exist
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest(tdigest)
FROM latencies_rollup;
ERROR: relation "latencies_rollup" does not exist
-- explain grouping by distribution column is completely pushed down for tdigest(tdigest)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest(tdigest)
FROM latencies_rollup
GROUP BY a;
ERROR: relation "latencies_rollup" does not exist
-- explain no grouping to verify partially pushed down for tdigest_precentile(tdigest, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile(tdigest, 0.99)
FROM latencies_rollup;
ERROR: relation "latencies_rollup" does not exist
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(tdigest, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile(tdigest, 0.99)
FROM latencies_rollup
GROUP BY a;
ERROR: relation "latencies_rollup" does not exist
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile(tdigest, ARRAY[0.99, 0.95])
FROM latencies_rollup;
ERROR: relation "latencies_rollup" does not exist
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile(tdigest, ARRAY[0.99, 0.95])
FROM latencies_rollup
GROUP BY a;
ERROR: relation "latencies_rollup" does not exist
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile_of(tdigest, 9000)
FROM latencies_rollup;
ERROR: relation "latencies_rollup" does not exist
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile_of(tdigest, 9000)
FROM latencies_rollup
GROUP BY a;
ERROR: relation "latencies_rollup" does not exist
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile_of(tdigest, ARRAY[9000, 9500])
FROM latencies_rollup;
ERROR: relation "latencies_rollup" does not exist
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile_of(tdigest, ARRAY[9000, 9500])
FROM latencies_rollup
GROUP BY a;
ERROR: relation "latencies_rollup" does not exist
-- verifying results - should be stable due to seed while inserting the data, if failure due to data these queries could be removed or check for certain ranges
SELECT tdigest(tdigest) FROM latencies_rollup;
ERROR: relation "latencies_rollup" does not exist
SELECT tdigest_percentile(tdigest, 0.99) FROM latencies_rollup;
ERROR: relation "latencies_rollup" does not exist
SELECT tdigest_percentile(tdigest, ARRAY[0.99, 0.95]) FROM latencies_rollup;
ERROR: relation "latencies_rollup" does not exist
SELECT tdigest_percentile_of(tdigest, 9000) FROM latencies_rollup;
ERROR: relation "latencies_rollup" does not exist
SELECT tdigest_percentile_of(tdigest, ARRAY[9000, 9500]) FROM latencies_rollup;
ERROR: relation "latencies_rollup" does not exist
SET client_min_messages TO WARNING; -- suppress cascade messages
DROP SCHEMA tdigest_aggregate_support CASCADE;

View File

@ -92,7 +92,7 @@ test: multi_subquery_in_where_reference_clause full_join adaptive_executor propa
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql
test: multi_reference_table multi_select_for_update relation_access_tracking
test: custom_aggregate_support aggregate_support
test: custom_aggregate_support aggregate_support tdigest_aggregate_support
test: multi_average_expression multi_working_columns multi_having_pushdown
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having chbenchmark_all_queries expression_reference_join

View File

@ -0,0 +1,195 @@
--
-- TDIGEST_AGGREGATE_SUPPORT
-- test the integration of github.com/tvondra/tdigest aggregates into the citus planner
-- for push down parts of the aggregate to use parallelized execution and reduced data
-- transfer sizes for aggregates not grouped by the distribution column
--
SET citus.next_shard_id TO 20070000;
CREATE SCHEMA tdigest_aggregate_support;
SET search_path TO tdigest_aggregate_support, public;
-- create the tdigest extension when installed
SELECT CASE WHEN COUNT(*) > 0
THEN 'CREATE EXTENSION tdigest WITH SCHEMA public'
ELSE 'SELECT false AS tdigest_present' END
AS create_cmd FROM pg_available_extensions()
WHERE name = 'tdigest'
\gset
:create_cmd;
SET citus.shard_count TO 4;
SET citus.coordinator_aggregation_strategy TO 'disabled'; -- prevent aggregate execution when the aggregate can't be pushed down
CREATE TABLE latencies (a int, b int, latency double precision);
SELECT create_distributed_table('latencies', 'a');
SELECT setseed(0.42); -- make the random data inserted deterministic
INSERT INTO latencies
SELECT (random()*20)::int AS a,
(random()*20)::int AS b,
random()*10000.0 AS latency
FROM generate_series(1, 10000);
-- explain no grouping to verify partially pushed down for tdigest(value, compression)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest(latency, 100)
FROM latencies;
-- explain grouping by distribution column is completely pushed down for tdigest(value, compression)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest(latency, 100)
FROM latencies
GROUP BY a;
-- explain grouping by non-distribution column is partially pushed down for tdigest(value, compression)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest(latency, 100)
FROM latencies
GROUP BY b;
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile(latency, 100, 0.99)
FROM latencies;
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile(latency, 100, 0.99)
FROM latencies
GROUP BY a;
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile(value, compression, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest_percentile(latency, 100, 0.99)
FROM latencies
GROUP BY b;
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
FROM latencies;
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
FROM latencies
GROUP BY a;
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
FROM latencies
GROUP BY b;
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile_of(latency, 100, 9000)
FROM latencies;
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile_of(latency, 100, 9000)
FROM latencies
GROUP BY a;
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest_percentile_of(latency, 100, 9000)
FROM latencies
GROUP BY b;
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
FROM latencies;
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
FROM latencies
GROUP BY a;
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT b, tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
FROM latencies
GROUP BY b;
-- verifying results - should be stable due to seed while inserting the data, if failure due to data these queries could be removed or check for certain ranges
SELECT tdigest(latency, 100) FROM latencies;
SELECT tdigest_percentile(latency, 100, 0.99) FROM latencies;
SELECT tdigest_percentile(latency, 100, ARRAY[0.99, 0.95]) FROM latencies;
SELECT tdigest_percentile_of(latency, 100, 9000) FROM latencies;
SELECT tdigest_percentile_of(latency, 100, ARRAY[9000, 9500]) FROM latencies;
CREATE TABLE latencies_rollup (a int, tdigest tdigest);
SELECT create_distributed_table('latencies_rollup', 'a', colocate_with => 'latencies');
INSERT INTO latencies_rollup
SELECT a, tdigest(latency, 100)
FROM latencies
GROUP BY a;
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest(tdigest)
FROM latencies_rollup;
-- explain grouping by distribution column is completely pushed down for tdigest(tdigest)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest(tdigest)
FROM latencies_rollup
GROUP BY a;
-- explain no grouping to verify partially pushed down for tdigest_precentile(tdigest, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile(tdigest, 0.99)
FROM latencies_rollup;
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(tdigest, quantile)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile(tdigest, 0.99)
FROM latencies_rollup
GROUP BY a;
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile(tdigest, ARRAY[0.99, 0.95])
FROM latencies_rollup;
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantiles[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile(tdigest, ARRAY[0.99, 0.95])
FROM latencies_rollup
GROUP BY a;
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile_of(tdigest, 9000)
FROM latencies_rollup;
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile_of(tdigest, 9000)
FROM latencies_rollup
GROUP BY a;
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT tdigest_percentile_of(tdigest, ARRAY[9000, 9500])
FROM latencies_rollup;
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
EXPLAIN (COSTS OFF, VERBOSE)
SELECT a, tdigest_percentile_of(tdigest, ARRAY[9000, 9500])
FROM latencies_rollup
GROUP BY a;
-- verifying results - should be stable due to seed while inserting the data, if failure due to data these queries could be removed or check for certain ranges
SELECT tdigest(tdigest) FROM latencies_rollup;
SELECT tdigest_percentile(tdigest, 0.99) FROM latencies_rollup;
SELECT tdigest_percentile(tdigest, ARRAY[0.99, 0.95]) FROM latencies_rollup;
SELECT tdigest_percentile_of(tdigest, 9000) FROM latencies_rollup;
SELECT tdigest_percentile_of(tdigest, ARRAY[9000, 9500]) FROM latencies_rollup;
SET client_min_messages TO WARNING; -- suppress cascade messages
DROP SCHEMA tdigest_aggregate_support CASCADE;