mirror of https://github.com/citusdata/citus.git
Feature: tdigest aggregate (#3897)
DESCRIPTION: Adds support to partially push down tdigest aggregates tdigest extensions: https://github.com/tvondra/tdigest This PR implements the partial pushdown of tdigest calculations when possible. The extension adds a tdigest type which can be combined into the same structure. There are several aggregate functions that can be used to get; - a quantile - a list of quantiles - the quantile of a hypothetical value - a list of quantiles for a list of hypothetical values These function can work both on values or tdigest types. Since we can create tdigest values either by combining them, or based on a group of values we can rewrite the aggregates in such a way that most of the computation gets delegated to the compute on the shards. This both speeds up the percentile calculations because the values don't have to be sorted while at the same time making the transfer size from the shards to the coordinator significantly less.pull/3877/head
parent
f69037c192
commit
da8f2b0134
|
@ -2442,13 +2442,13 @@ CurrentUserName(void)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LookupTypeOid returns the Oid of the "pg_catalog.{typeNameString}" type, or
|
* LookupTypeOid returns the Oid of the "{schemaNameSting}.{typeNameString}" type, or
|
||||||
* InvalidOid if it does not exist.
|
* InvalidOid if it does not exist.
|
||||||
*/
|
*/
|
||||||
static Oid
|
Oid
|
||||||
LookupTypeOid(char *typeNameString)
|
LookupTypeOid(char *schemaNameSting, char *typeNameString)
|
||||||
{
|
{
|
||||||
Value *schemaName = makeString("pg_catalog");
|
Value *schemaName = makeString(schemaNameSting);
|
||||||
Value *typeName = makeString(typeNameString);
|
Value *typeName = makeString(typeNameString);
|
||||||
List *qualifiedName = list_make2(schemaName, typeName);
|
List *qualifiedName = list_make2(schemaName, typeName);
|
||||||
TypeName *enumTypeName = makeTypeNameFromNameList(qualifiedName);
|
TypeName *enumTypeName = makeTypeNameFromNameList(qualifiedName);
|
||||||
|
@ -2480,7 +2480,7 @@ LookupTypeOid(char *typeNameString)
|
||||||
static Oid
|
static Oid
|
||||||
LookupStringEnumValueId(char *enumName, char *valueName)
|
LookupStringEnumValueId(char *enumName, char *valueName)
|
||||||
{
|
{
|
||||||
Oid enumTypeId = LookupTypeOid(enumName);
|
Oid enumTypeId = LookupTypeOid("pg_catalog", enumName);
|
||||||
|
|
||||||
if (enumTypeId == InvalidOid)
|
if (enumTypeId == InvalidOid)
|
||||||
{
|
{
|
||||||
|
|
|
@ -39,6 +39,7 @@
|
||||||
#include "distributed/multi_logical_planner.h"
|
#include "distributed/multi_logical_planner.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/pg_dist_partition.h"
|
#include "distributed/pg_dist_partition.h"
|
||||||
|
#include "distributed/tdigest_extension.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
|
@ -61,6 +62,8 @@
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
|
#define StartsWith(msg, prefix) \
|
||||||
|
(strncmp(msg, prefix, strlen(prefix)) == 0)
|
||||||
|
|
||||||
/* Config variable managed via guc.c */
|
/* Config variable managed via guc.c */
|
||||||
int LimitClauseRowFetchCount = -1; /* number of rows to fetch from each task */
|
int LimitClauseRowFetchCount = -1; /* number of rows to fetch from each task */
|
||||||
|
@ -1951,6 +1954,131 @@ MasterAggregateExpression(Aggref *originalAggregate,
|
||||||
|
|
||||||
newMasterExpression = (Expr *) unionAggregate;
|
newMasterExpression = (Expr *) unionAggregate;
|
||||||
}
|
}
|
||||||
|
else if (aggregateType == AGGREGATE_TDIGEST_COMBINE ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_ADD_DOUBLE)
|
||||||
|
{
|
||||||
|
/* tdigest of column */
|
||||||
|
Oid tdigestType = TDigestExtensionTypeOid(); /* tdigest type */
|
||||||
|
Oid unionFunctionId = TDigestExtensionAggTDigest1();
|
||||||
|
|
||||||
|
int32 tdigestReturnTypeMod = exprTypmod((Node *) originalAggregate);
|
||||||
|
Oid tdigestTypeCollationId = exprCollation((Node *) originalAggregate);
|
||||||
|
|
||||||
|
/* create first argument for tdigest_precentile(tdigest, double) */
|
||||||
|
Var *tdigestColumn = makeVar(masterTableId, walkerContext->columnId, tdigestType,
|
||||||
|
tdigestReturnTypeMod, tdigestTypeCollationId,
|
||||||
|
columnLevelsUp);
|
||||||
|
TargetEntry *tdigestTargetEntry = makeTargetEntry((Expr *) tdigestColumn,
|
||||||
|
argumentId,
|
||||||
|
NULL, false);
|
||||||
|
walkerContext->columnId++;
|
||||||
|
|
||||||
|
/* construct the master tdigest(tdigest) expression */
|
||||||
|
Aggref *unionAggregate = makeNode(Aggref);
|
||||||
|
unionAggregate->aggfnoid = unionFunctionId;
|
||||||
|
unionAggregate->aggtype = originalAggregate->aggtype;
|
||||||
|
unionAggregate->args = list_make1(tdigestTargetEntry);
|
||||||
|
unionAggregate->aggkind = AGGKIND_NORMAL;
|
||||||
|
unionAggregate->aggfilter = NULL;
|
||||||
|
unionAggregate->aggtranstype = InvalidOid;
|
||||||
|
unionAggregate->aggargtypes = list_make1_oid(tdigestType);
|
||||||
|
unionAggregate->aggsplit = AGGSPLIT_SIMPLE;
|
||||||
|
|
||||||
|
newMasterExpression = (Expr *) unionAggregate;
|
||||||
|
}
|
||||||
|
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLE ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLEARRAY ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLE ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLEARRAY)
|
||||||
|
{
|
||||||
|
/* tdigest of column */
|
||||||
|
Oid tdigestType = TDigestExtensionTypeOid();
|
||||||
|
Oid unionFunctionId = InvalidOid;
|
||||||
|
if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLE)
|
||||||
|
{
|
||||||
|
unionFunctionId = TDigestExtensionAggTDigestPercentile2();
|
||||||
|
}
|
||||||
|
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLEARRAY)
|
||||||
|
{
|
||||||
|
unionFunctionId = TDigestExtensionAggTDigestPercentile2a();
|
||||||
|
}
|
||||||
|
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLE)
|
||||||
|
{
|
||||||
|
unionFunctionId = TDigestExtensionAggTDigestPercentileOf2();
|
||||||
|
}
|
||||||
|
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLEARRAY)
|
||||||
|
{
|
||||||
|
unionFunctionId = TDigestExtensionAggTDigestPercentileOf2a();
|
||||||
|
}
|
||||||
|
Assert(OidIsValid(unionFunctionId));
|
||||||
|
|
||||||
|
int32 tdigestReturnTypeMod = exprTypmod((Node *) originalAggregate);
|
||||||
|
Oid tdigestTypeCollationId = exprCollation((Node *) originalAggregate);
|
||||||
|
|
||||||
|
/* create first argument for tdigest_precentile(tdigest, double) */
|
||||||
|
Var *tdigestColumn = makeVar(masterTableId, walkerContext->columnId, tdigestType,
|
||||||
|
tdigestReturnTypeMod, tdigestTypeCollationId,
|
||||||
|
columnLevelsUp);
|
||||||
|
TargetEntry *tdigestTargetEntry = makeTargetEntry((Expr *) tdigestColumn,
|
||||||
|
argumentId, NULL, false);
|
||||||
|
walkerContext->columnId++;
|
||||||
|
|
||||||
|
/* construct the master tdigest_precentile(tdigest, double) expression */
|
||||||
|
Aggref *unionAggregate = makeNode(Aggref);
|
||||||
|
unionAggregate->aggfnoid = unionFunctionId;
|
||||||
|
unionAggregate->aggtype = originalAggregate->aggtype;
|
||||||
|
unionAggregate->args = list_make2(
|
||||||
|
tdigestTargetEntry,
|
||||||
|
list_nth(originalAggregate->args, 2));
|
||||||
|
unionAggregate->aggkind = AGGKIND_NORMAL;
|
||||||
|
unionAggregate->aggfilter = NULL;
|
||||||
|
unionAggregate->aggtranstype = InvalidOid;
|
||||||
|
unionAggregate->aggargtypes = list_make2_oid(
|
||||||
|
tdigestType,
|
||||||
|
list_nth_oid(originalAggregate->aggargtypes, 2));
|
||||||
|
unionAggregate->aggsplit = AGGSPLIT_SIMPLE;
|
||||||
|
|
||||||
|
newMasterExpression = (Expr *) unionAggregate;
|
||||||
|
}
|
||||||
|
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLE ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLEARRAY)
|
||||||
|
{
|
||||||
|
/* tdigest of column */
|
||||||
|
Oid tdigestType = TDigestExtensionTypeOid();
|
||||||
|
|
||||||
|
/* These functions already will combine the tdigest arguments returned */
|
||||||
|
Oid unionFunctionId = originalAggregate->aggfnoid;
|
||||||
|
|
||||||
|
int32 tdigestReturnTypeMod = exprTypmod((Node *) originalAggregate);
|
||||||
|
Oid tdigestTypeCollationId = exprCollation((Node *) originalAggregate);
|
||||||
|
|
||||||
|
/* create first argument for tdigest_precentile(tdigest, double) */
|
||||||
|
Var *tdigestColumn = makeVar(masterTableId, walkerContext->columnId, tdigestType,
|
||||||
|
tdigestReturnTypeMod, tdigestTypeCollationId,
|
||||||
|
columnLevelsUp);
|
||||||
|
TargetEntry *tdigestTargetEntry = makeTargetEntry((Expr *) tdigestColumn,
|
||||||
|
argumentId, NULL, false);
|
||||||
|
walkerContext->columnId++;
|
||||||
|
|
||||||
|
/* construct the master tdigest_precentile(tdigest, double) expression */
|
||||||
|
Aggref *unionAggregate = makeNode(Aggref);
|
||||||
|
unionAggregate->aggfnoid = unionFunctionId;
|
||||||
|
unionAggregate->aggtype = originalAggregate->aggtype;
|
||||||
|
unionAggregate->args = list_make2(
|
||||||
|
tdigestTargetEntry,
|
||||||
|
list_nth(originalAggregate->args, 1));
|
||||||
|
unionAggregate->aggkind = AGGKIND_NORMAL;
|
||||||
|
unionAggregate->aggfilter = NULL;
|
||||||
|
unionAggregate->aggtranstype = InvalidOid;
|
||||||
|
unionAggregate->aggargtypes = list_make2_oid(
|
||||||
|
tdigestType,
|
||||||
|
list_nth_oid(originalAggregate->aggargtypes, 1));
|
||||||
|
unionAggregate->aggsplit = AGGSPLIT_SIMPLE;
|
||||||
|
|
||||||
|
newMasterExpression = (Expr *) unionAggregate;
|
||||||
|
}
|
||||||
else if (aggregateType == AGGREGATE_CUSTOM_COMBINE)
|
else if (aggregateType == AGGREGATE_CUSTOM_COMBINE)
|
||||||
{
|
{
|
||||||
HeapTuple aggTuple =
|
HeapTuple aggTuple =
|
||||||
|
@ -3079,6 +3207,71 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
|
||||||
workerAggregateList = lappend(workerAggregateList, sumAggregate);
|
workerAggregateList = lappend(workerAggregateList, sumAggregate);
|
||||||
workerAggregateList = lappend(workerAggregateList, countAggregate);
|
workerAggregateList = lappend(workerAggregateList, countAggregate);
|
||||||
}
|
}
|
||||||
|
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLE ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLEARRAY ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLE ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLEARRAY)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The original query has an aggregate in the form of either
|
||||||
|
* - tdigest_percentile(column, compression, quantile)
|
||||||
|
* - tdigest_percentile(column, compression, quantile[])
|
||||||
|
* - tdigest_percentile_of(column, compression, value)
|
||||||
|
* - tdigest_percentile_of(column, compression, value[])
|
||||||
|
*
|
||||||
|
* We are creating the worker part of this query by creating a
|
||||||
|
* - tdigest(column, compression)
|
||||||
|
*
|
||||||
|
* One could see we are passing argument 0 and argument 1 from the original query
|
||||||
|
* in here. This corresponds with the list_nth calls in the args and aggargstypes
|
||||||
|
* list construction. The tdigest function and type are read from the catalog.
|
||||||
|
*/
|
||||||
|
Aggref *newWorkerAggregate = copyObject(originalAggregate);
|
||||||
|
newWorkerAggregate->aggfnoid = TDigestExtensionAggTDigest2();
|
||||||
|
newWorkerAggregate->aggtype = TDigestExtensionTypeOid();
|
||||||
|
newWorkerAggregate->args = list_make2(
|
||||||
|
list_nth(newWorkerAggregate->args, 0),
|
||||||
|
list_nth(newWorkerAggregate->args, 1));
|
||||||
|
newWorkerAggregate->aggkind = AGGKIND_NORMAL;
|
||||||
|
newWorkerAggregate->aggtranstype = InvalidOid;
|
||||||
|
newWorkerAggregate->aggargtypes = list_make2_oid(
|
||||||
|
list_nth_oid(newWorkerAggregate->aggargtypes, 0),
|
||||||
|
list_nth_oid(newWorkerAggregate->aggargtypes, 1));
|
||||||
|
newWorkerAggregate->aggsplit = AGGSPLIT_SIMPLE;
|
||||||
|
|
||||||
|
workerAggregateList = lappend(workerAggregateList, newWorkerAggregate);
|
||||||
|
}
|
||||||
|
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLE ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE ||
|
||||||
|
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLEARRAY)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The original query has an aggregate in the form of either
|
||||||
|
* - tdigest_percentile(tdigest, quantile)
|
||||||
|
* - tdigest_percentile(tdigest, quantile[])
|
||||||
|
* - tdigest_percentile_of(tdigest, value)
|
||||||
|
* - tdigest_percentile_of(tdigest, value[])
|
||||||
|
*
|
||||||
|
* We are creating the worker part of this query by creating a
|
||||||
|
* - tdigest(tdigest)
|
||||||
|
*
|
||||||
|
* One could see we are passing argument 0 from the original query in here. This
|
||||||
|
* corresponds with the list_nth calls in the args and aggargstypes list
|
||||||
|
* construction. The tdigest function and type are read from the catalog.
|
||||||
|
*/
|
||||||
|
Aggref *newWorkerAggregate = copyObject(originalAggregate);
|
||||||
|
newWorkerAggregate->aggfnoid = TDigestExtensionAggTDigest1();
|
||||||
|
newWorkerAggregate->aggtype = TDigestExtensionTypeOid();
|
||||||
|
newWorkerAggregate->args = list_make1(list_nth(newWorkerAggregate->args, 0));
|
||||||
|
newWorkerAggregate->aggkind = AGGKIND_NORMAL;
|
||||||
|
newWorkerAggregate->aggtranstype = InvalidOid;
|
||||||
|
newWorkerAggregate->aggargtypes = list_make1_oid(
|
||||||
|
list_nth_oid(newWorkerAggregate->aggargtypes, 0));
|
||||||
|
newWorkerAggregate->aggsplit = AGGSPLIT_SIMPLE;
|
||||||
|
|
||||||
|
workerAggregateList = lappend(workerAggregateList, newWorkerAggregate);
|
||||||
|
}
|
||||||
else if (aggregateType == AGGREGATE_CUSTOM_COMBINE)
|
else if (aggregateType == AGGREGATE_CUSTOM_COMBINE)
|
||||||
{
|
{
|
||||||
HeapTuple aggTuple =
|
HeapTuple aggTuple =
|
||||||
|
@ -3178,6 +3371,66 @@ GetAggregateType(Aggref *aggregateExpression)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* All functions from github.com/tvondra/tdigest start with the "tdigest" prefix.
|
||||||
|
* Since it requires lookups of function names in a schema we would like to only
|
||||||
|
* perform these checks if there is some chance it will actually result in a positive
|
||||||
|
* hit.
|
||||||
|
*/
|
||||||
|
if (StartsWith(aggregateProcName, "tdigest"))
|
||||||
|
{
|
||||||
|
if (aggFunctionId == TDigestExtensionAggTDigest1())
|
||||||
|
{
|
||||||
|
return AGGREGATE_TDIGEST_COMBINE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aggFunctionId == TDigestExtensionAggTDigest2())
|
||||||
|
{
|
||||||
|
return AGGREGATE_TDIGEST_ADD_DOUBLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aggFunctionId == TDigestExtensionAggTDigestPercentile3())
|
||||||
|
{
|
||||||
|
return AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aggFunctionId == TDigestExtensionAggTDigestPercentile3a())
|
||||||
|
{
|
||||||
|
return AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLEARRAY;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aggFunctionId == TDigestExtensionAggTDigestPercentile2())
|
||||||
|
{
|
||||||
|
return AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aggFunctionId == TDigestExtensionAggTDigestPercentile2a())
|
||||||
|
{
|
||||||
|
return AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf3())
|
||||||
|
{
|
||||||
|
return AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf3a())
|
||||||
|
{
|
||||||
|
return AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLEARRAY;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf2())
|
||||||
|
{
|
||||||
|
return AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf2a())
|
||||||
|
{
|
||||||
|
return AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLEARRAY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
if (AggregateEnabledCustom(aggregateExpression))
|
if (AggregateEnabledCustom(aggregateExpression))
|
||||||
{
|
{
|
||||||
return AGGREGATE_CUSTOM_COMBINE;
|
return AGGREGATE_CUSTOM_COMBINE;
|
||||||
|
|
|
@ -0,0 +1,248 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* tdigest_extension.c
|
||||||
|
* Helper functions to get access to tdigest specific data.
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "access/genam.h"
|
||||||
|
#include "access/htup_details.h"
|
||||||
|
#include "catalog/pg_extension.h"
|
||||||
|
#include "catalog/pg_type.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/tdigest_extension.h"
|
||||||
|
#include "parser/parse_func.h"
|
||||||
|
#include "utils/fmgroids.h"
|
||||||
|
#include "utils/lsyscache.h"
|
||||||
|
|
||||||
|
|
||||||
|
static Oid LookupTDigestFunction(const char *functionName, int argcount, Oid *argtypes);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TDigestExtensionSchema finds the schema the tdigest extension is installed in. The
|
||||||
|
* function will return InvalidOid if the extension is not installed.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
TDigestExtensionSchema()
|
||||||
|
{
|
||||||
|
ScanKeyData entry[1];
|
||||||
|
Form_pg_extension extensionForm = NULL;
|
||||||
|
Oid tdigestExtensionSchema = InvalidOid;
|
||||||
|
|
||||||
|
Relation relation = heap_open(ExtensionRelationId, AccessShareLock);
|
||||||
|
|
||||||
|
ScanKeyInit(&entry[0],
|
||||||
|
Anum_pg_extension_extname,
|
||||||
|
BTEqualStrategyNumber, F_NAMEEQ,
|
||||||
|
CStringGetDatum("tdigest"));
|
||||||
|
|
||||||
|
SysScanDesc scandesc = systable_beginscan(relation, ExtensionNameIndexId, true,
|
||||||
|
NULL, 1, entry);
|
||||||
|
|
||||||
|
HeapTuple extensionTuple = systable_getnext(scandesc);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We assume that there can be at most one matching tuple, if no tuple found the
|
||||||
|
* extension is not installed. The value of InvalidOid will not be changed.
|
||||||
|
*/
|
||||||
|
if (HeapTupleIsValid(extensionTuple))
|
||||||
|
{
|
||||||
|
extensionForm = (Form_pg_extension) GETSTRUCT(extensionTuple);
|
||||||
|
tdigestExtensionSchema = extensionForm->extnamespace;
|
||||||
|
Assert(OidIsValid(tdigestExtensionSchema));
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan(scandesc);
|
||||||
|
|
||||||
|
heap_close(relation, AccessShareLock);
|
||||||
|
|
||||||
|
return tdigestExtensionSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TDigestExtensionTypeOid performs a lookup for the Oid of the type representing the
|
||||||
|
* tdigest as installed by the tdigest extension returns InvalidOid if the type cannot be
|
||||||
|
* found.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
TDigestExtensionTypeOid()
|
||||||
|
{
|
||||||
|
Oid tdigestSchemaOid = TDigestExtensionSchema();
|
||||||
|
if (!OidIsValid(tdigestSchemaOid))
|
||||||
|
{
|
||||||
|
return InvalidOid;
|
||||||
|
}
|
||||||
|
char *namespaceName = get_namespace_name(tdigestSchemaOid);
|
||||||
|
return LookupTypeOid(namespaceName, "tdigest");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LookupTDigestFunction is a helper function specifically to lookup functions in the
|
||||||
|
* namespace/schema where the tdigest extension is installed. This makes the lookup of
|
||||||
|
* following aggregate functions easier and less repetitive.
|
||||||
|
*/
|
||||||
|
static Oid
|
||||||
|
LookupTDigestFunction(const char *functionName, int argcount, Oid *argtypes)
|
||||||
|
{
|
||||||
|
Oid tdigestSchemaOid = TDigestExtensionSchema();
|
||||||
|
if (!OidIsValid(tdigestSchemaOid))
|
||||||
|
{
|
||||||
|
return InvalidOid;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *namespaceName = get_namespace_name(tdigestSchemaOid);
|
||||||
|
return LookupFuncName(
|
||||||
|
list_make2(makeString(namespaceName), makeString(pstrdup(functionName))),
|
||||||
|
argcount, argtypes, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TDigestExtensionAggTDigest1 performs a lookup for the Oid of the tdigest aggregate;
|
||||||
|
* tdigest(tdigest)
|
||||||
|
*
|
||||||
|
* If the aggregate is not found InvalidOid is returned.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
TDigestExtensionAggTDigest1()
|
||||||
|
{
|
||||||
|
return LookupTDigestFunction("tdigest", 1, (Oid[]) { TDigestExtensionTypeOid() });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TDigestExtensionAggTDigest2 performs a lookup for the Oid of the tdigest aggregate;
|
||||||
|
* tdigest(value double precision, compression int)
|
||||||
|
*
|
||||||
|
* If the aggregate is not found InvalidOid is returned.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
TDigestExtensionAggTDigest2()
|
||||||
|
{
|
||||||
|
return LookupTDigestFunction("tdigest", 2, (Oid[]) { FLOAT8OID, INT4OID });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TDigestExtensionAggTDigestPercentile2 performs a lookup for the Oid of the tdigest
|
||||||
|
* aggregate;
|
||||||
|
* tdigest_percentile(tdigest, double precision)
|
||||||
|
*
|
||||||
|
* If the aggregate is not found InvalidOid is returned.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
TDigestExtensionAggTDigestPercentile2()
|
||||||
|
{
|
||||||
|
return LookupTDigestFunction("tdigest_percentile", 2,
|
||||||
|
(Oid[]) { TDigestExtensionTypeOid(), FLOAT8OID });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TDigestExtensionAggTDigestPercentile2a performs a lookup for the Oid of the tdigest
|
||||||
|
* aggregate;
|
||||||
|
* tdigest_percentile(tdigest, double precision[])
|
||||||
|
*
|
||||||
|
* If the aggregate is not found InvalidOid is returned.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
TDigestExtensionAggTDigestPercentile2a(void)
|
||||||
|
{
|
||||||
|
return LookupTDigestFunction("tdigest_percentile", 2,
|
||||||
|
(Oid[]) { TDigestExtensionTypeOid(), FLOAT8ARRAYOID });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TDigestExtensionAggTDigestPercentile3 performs a lookup for the Oid of the tdigest
|
||||||
|
* aggregate;
|
||||||
|
* tdigest_percentile(double precision, int, double precision)
|
||||||
|
*
|
||||||
|
* If the aggregate is not found InvalidOid is returned.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
TDigestExtensionAggTDigestPercentile3()
|
||||||
|
{
|
||||||
|
return LookupTDigestFunction("tdigest_percentile", 3,
|
||||||
|
(Oid[]) { FLOAT8OID, INT4OID, FLOAT8OID });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TDigestExtensionAggTDigestPercentile3a performs a lookup for the Oid of the tdigest
|
||||||
|
* aggregate;
|
||||||
|
* tdigest_percentile(double precision, int, double precision[])
|
||||||
|
*
|
||||||
|
* If the aggregate is not found InvalidOid is returned.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
TDigestExtensionAggTDigestPercentile3a(void)
|
||||||
|
{
|
||||||
|
return LookupTDigestFunction("tdigest_percentile", 3,
|
||||||
|
(Oid[]) { FLOAT8OID, INT4OID, FLOAT8ARRAYOID });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TDigestExtensionAggTDigestPercentileOf2 performs a lookup for the Oid of the tdigest
|
||||||
|
* aggregate;
|
||||||
|
* tdigest_percentile_of(tdigest, double precision)
|
||||||
|
*
|
||||||
|
* If the aggregate is not found InvalidOid is returned.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
TDigestExtensionAggTDigestPercentileOf2()
|
||||||
|
{
|
||||||
|
return LookupTDigestFunction("tdigest_percentile_of", 2,
|
||||||
|
(Oid[]) { TDigestExtensionTypeOid(), FLOAT8OID });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TDigestExtensionAggTDigestPercentileOf2a performs a lookup for the Oid of the tdigest
|
||||||
|
* aggregate;
|
||||||
|
* tdigest_percentile_of(tdigest, double precision[])
|
||||||
|
*
|
||||||
|
* If the aggregate is not found InvalidOid is returned.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
TDigestExtensionAggTDigestPercentileOf2a(void)
|
||||||
|
{
|
||||||
|
return LookupTDigestFunction("tdigest_percentile_of", 2,
|
||||||
|
(Oid[]) { TDigestExtensionTypeOid(), FLOAT8ARRAYOID });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TDigestExtensionAggTDigestPercentileOf3 performs a lookup for the Oid of the tdigest
|
||||||
|
* aggregate;
|
||||||
|
* tdigest_percentile_of(double precision, int, double precision)
|
||||||
|
*
|
||||||
|
* If the aggregate is not found InvalidOid is returned.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
TDigestExtensionAggTDigestPercentileOf3()
|
||||||
|
{
|
||||||
|
return LookupTDigestFunction("tdigest_percentile_of", 3,
|
||||||
|
(Oid[]) { FLOAT8OID, INT4OID, FLOAT8OID });
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TDigestExtensionAggTDigestPercentileOf3a performs a lookup for the Oid of the tdigest
|
||||||
|
* aggregate;
|
||||||
|
* tdigest_percentile_of(double precision, int, double precision[])
|
||||||
|
*
|
||||||
|
* If the aggregate is not found InvalidOid is returned.
|
||||||
|
*/
|
||||||
|
Oid
|
||||||
|
TDigestExtensionAggTDigestPercentileOf3a(void)
|
||||||
|
{
|
||||||
|
return LookupTDigestFunction("tdigest_percentile_of", 3,
|
||||||
|
(Oid[]) { FLOAT8OID, INT4OID, FLOAT8ARRAYOID });
|
||||||
|
}
|
|
@ -194,6 +194,7 @@ extern Oid DistPlacementGroupidIndexId(void);
|
||||||
extern Oid DistObjectPrimaryKeyIndexId(void);
|
extern Oid DistObjectPrimaryKeyIndexId(void);
|
||||||
|
|
||||||
/* type oids */
|
/* type oids */
|
||||||
|
extern Oid LookupTypeOid(char *schemaNameSting, char *typeNameString);
|
||||||
extern Oid CitusCopyFormatTypeId(void);
|
extern Oid CitusCopyFormatTypeId(void);
|
||||||
|
|
||||||
/* function oids */
|
/* function oids */
|
||||||
|
@ -221,5 +222,4 @@ extern char * CitusExtensionOwnerName(void);
|
||||||
extern char * CurrentUserName(void);
|
extern char * CurrentUserName(void);
|
||||||
extern const char * CurrentDatabaseName(void);
|
extern const char * CurrentDatabaseName(void);
|
||||||
|
|
||||||
|
|
||||||
#endif /* METADATA_CACHE_H */
|
#endif /* METADATA_CACHE_H */
|
||||||
|
|
|
@ -80,9 +80,21 @@ typedef enum
|
||||||
AGGREGATE_TOPN_UNION_AGG = 19,
|
AGGREGATE_TOPN_UNION_AGG = 19,
|
||||||
AGGREGATE_ANY_VALUE = 20,
|
AGGREGATE_ANY_VALUE = 20,
|
||||||
|
|
||||||
|
/* support for github.com/tvondra/tdigest */
|
||||||
|
AGGREGATE_TDIGEST_COMBINE = 21,
|
||||||
|
AGGREGATE_TDIGEST_ADD_DOUBLE = 22,
|
||||||
|
AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLE = 23,
|
||||||
|
AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLEARRAY = 24,
|
||||||
|
AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLE = 25,
|
||||||
|
AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY = 26,
|
||||||
|
AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLE = 27,
|
||||||
|
AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLEARRAY = 28,
|
||||||
|
AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE = 29,
|
||||||
|
AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLEARRAY = 30,
|
||||||
|
|
||||||
/* AGGREGATE_CUSTOM must come last */
|
/* AGGREGATE_CUSTOM must come last */
|
||||||
AGGREGATE_CUSTOM_COMBINE = 21,
|
AGGREGATE_CUSTOM_COMBINE = 31,
|
||||||
AGGREGATE_CUSTOM_ROW_GATHER = 22,
|
AGGREGATE_CUSTOM_ROW_GATHER = 32,
|
||||||
} AggregateType;
|
} AggregateType;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* tdigest_extension.c
|
||||||
|
* Helper functions to get access to tdigest specific data.
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef CITUS_TDIGEST_EXTENSION_H
|
||||||
|
#define CITUS_TDIGEST_EXTENSION_H
|
||||||
|
|
||||||
|
/* tdigest related functions */
|
||||||
|
extern Oid TDigestExtensionSchema(void);
|
||||||
|
extern Oid TDigestExtensionTypeOid(void);
|
||||||
|
extern Oid TDigestExtensionAggTDigest1(void);
|
||||||
|
extern Oid TDigestExtensionAggTDigest2(void);
|
||||||
|
extern Oid TDigestExtensionAggTDigestPercentile2(void);
|
||||||
|
extern Oid TDigestExtensionAggTDigestPercentile2a(void);
|
||||||
|
extern Oid TDigestExtensionAggTDigestPercentile3(void);
|
||||||
|
extern Oid TDigestExtensionAggTDigestPercentile3a(void);
|
||||||
|
extern Oid TDigestExtensionAggTDigestPercentileOf2(void);
|
||||||
|
extern Oid TDigestExtensionAggTDigestPercentileOf2a(void);
|
||||||
|
extern Oid TDigestExtensionAggTDigestPercentileOf3(void);
|
||||||
|
extern Oid TDigestExtensionAggTDigestPercentileOf3a(void);
|
||||||
|
|
||||||
|
#endif /* CITUS_TDIGEST_EXTENSION_H */
|
|
@ -0,0 +1,626 @@
|
||||||
|
--
|
||||||
|
-- TDIGEST_AGGREGATE_SUPPORT
|
||||||
|
-- test the integration of github.com/tvondra/tdigest aggregates into the citus planner
|
||||||
|
-- for push down parts of the aggregate to use parallelized execution and reduced data
|
||||||
|
-- transfer sizes for aggregates not grouped by the distribution column
|
||||||
|
--
|
||||||
|
SET citus.next_shard_id TO 20070000;
|
||||||
|
CREATE SCHEMA tdigest_aggregate_support;
|
||||||
|
SET search_path TO tdigest_aggregate_support, public;
|
||||||
|
-- create the tdigest extension when installed
|
||||||
|
SELECT CASE WHEN COUNT(*) > 0
|
||||||
|
THEN 'CREATE EXTENSION tdigest WITH SCHEMA public'
|
||||||
|
ELSE 'SELECT false AS tdigest_present' END
|
||||||
|
AS create_cmd FROM pg_available_extensions()
|
||||||
|
WHERE name = 'tdigest'
|
||||||
|
\gset
|
||||||
|
:create_cmd;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.coordinator_aggregation_strategy TO 'disabled'; -- prevent aggregate execution when the aggregate can't be pushed down
|
||||||
|
CREATE TABLE latencies (a int, b int, latency double precision);
|
||||||
|
SELECT create_distributed_table('latencies', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT setseed(0.42); -- make the random data inserted deterministic
|
||||||
|
setseed
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO latencies
|
||||||
|
SELECT (random()*20)::int AS a,
|
||||||
|
(random()*20)::int AS b,
|
||||||
|
random()*10000.0 AS latency
|
||||||
|
FROM generate_series(1, 10000);
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest(value, compression)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest(latency, 100)
|
||||||
|
FROM latencies;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
Output: tdigest(remote_scan.tdigest)
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.tdigest
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
Output: tdigest(latency, 100)
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest(value, compression)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest(latency, 100)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.a, remote_scan.tdigest
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: a, tdigest(latency, 100)
|
||||||
|
Group Key: latencies.a
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(11 rows)
|
||||||
|
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest(value, compression)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest(latency, 100)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
HashAggregate
|
||||||
|
Output: remote_scan.b, tdigest(remote_scan.tdigest)
|
||||||
|
Group Key: remote_scan.b
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.b, remote_scan.tdigest
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: b, tdigest(latency, 100)
|
||||||
|
Group Key: latencies.b
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(14 rows)
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile(latency, 100, 0.99)
|
||||||
|
FROM latencies;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
Output: tdigest_percentile(remote_scan.tdigest_percentile, '0.99'::double precision)
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.tdigest_percentile
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
Output: tdigest(latency, 100)
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile(latency, 100, 0.99)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.a, remote_scan.tdigest_percentile
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: a, tdigest_percentile(latency, 100, '0.99'::double precision)
|
||||||
|
Group Key: latencies.a
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(11 rows)
|
||||||
|
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile(value, compression, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest_percentile(latency, 100, 0.99)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
HashAggregate
|
||||||
|
Output: remote_scan.b, tdigest_percentile(remote_scan.tdigest_percentile, '0.99'::double precision)
|
||||||
|
Group Key: remote_scan.b
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.b, remote_scan.tdigest_percentile
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: b, tdigest(latency, 100)
|
||||||
|
Group Key: latencies.b
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(14 rows)
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
Output: tdigest_percentile(remote_scan.tdigest_percentile, '{0.99,0.95}'::double precision[])
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.tdigest_percentile
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
Output: tdigest(latency, 100)
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.a, remote_scan.tdigest_percentile
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: a, tdigest_percentile(latency, 100, '{0.99,0.95}'::double precision[])
|
||||||
|
Group Key: latencies.a
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(11 rows)
|
||||||
|
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
HashAggregate
|
||||||
|
Output: remote_scan.b, tdigest_percentile(remote_scan.tdigest_percentile, '{0.99,0.95}'::double precision[])
|
||||||
|
Group Key: remote_scan.b
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.b, remote_scan.tdigest_percentile
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: b, tdigest(latency, 100)
|
||||||
|
Group Key: latencies.b
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(14 rows)
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile_of(latency, 100, 9000)
|
||||||
|
FROM latencies;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
Output: tdigest_percentile_of(remote_scan.tdigest_percentile_of, '9000'::double precision)
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.tdigest_percentile_of
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
Output: tdigest(latency, 100)
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile_of(latency, 100, 9000)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.a, remote_scan.tdigest_percentile_of
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: a, tdigest_percentile_of(latency, 100, '9000'::double precision)
|
||||||
|
Group Key: latencies.a
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(11 rows)
|
||||||
|
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest_percentile_of(latency, 100, 9000)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
HashAggregate
|
||||||
|
Output: remote_scan.b, tdigest_percentile_of(remote_scan.tdigest_percentile_of, '9000'::double precision)
|
||||||
|
Group Key: remote_scan.b
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.b, remote_scan.tdigest_percentile_of
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: b, tdigest(latency, 100)
|
||||||
|
Group Key: latencies.b
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(14 rows)
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
|
||||||
|
FROM latencies;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
Output: tdigest_percentile_of(remote_scan.tdigest_percentile_of, '{9000,9500}'::double precision[])
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.tdigest_percentile_of
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
Output: tdigest(latency, 100)
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.a, remote_scan.tdigest_percentile_of
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: a, tdigest_percentile_of(latency, 100, '{9000,9500}'::double precision[])
|
||||||
|
Group Key: latencies.a
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(11 rows)
|
||||||
|
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
HashAggregate
|
||||||
|
Output: remote_scan.b, tdigest_percentile_of(remote_scan.tdigest_percentile_of, '{9000,9500}'::double precision[])
|
||||||
|
Group Key: remote_scan.b
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.b, remote_scan.tdigest_percentile_of
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: b, tdigest(latency, 100)
|
||||||
|
Group Key: latencies.b
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_20070000 latencies
|
||||||
|
Output: a, b, latency
|
||||||
|
(14 rows)
|
||||||
|
|
||||||
|
-- verifying results - should be stable due to seed while inserting the data, if failure due to data these queries could be removed or check for certain ranges
|
||||||
|
SELECT tdigest(latency, 100) FROM latencies;
|
||||||
|
tdigest
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
flags 0 count 10000 compression 100 centroids 46 (0.287235, 1) (1.025106, 1) (2.058216, 1) (5.335597, 1) (12.707263, 2) (25.302479, 3) (43.435063, 4) (77.987860, 5) (269.478664, 10) (509.417419, 13) (1227.158879, 22) (3408.256171, 35) (7772.721988, 55) (13840.275516, 65) (32937.127607, 108) (64476.403332, 148) (118260.230644, 199) (239584.293240, 292) (562119.836766, 463) (944722.686313, 547) (1751089.620493, 749) (3751264.745959, 1128) (5877270.108576, 1300) (6224557.402567, 1104) (5804999.258033, 874) (5632316.697114, 755) (4648651.050740, 573) (3460055.227950, 402) (2820271.404686, 314) (2676501.012955, 288) (1649845.166017, 173) (1269335.942008, 131) (813964.853243, 83) (484144.878702, 49) (337179.763016, 34) (198775.241901, 20) (149353.499704, 15) (109688.319223, 11) (79855.926155, 8) (49937.731689, 5) (29971.046175, 3) (19982.538737, 2) (9991.467422, 1) (9992.337047, 1) (9995.578357, 1) (9999.700339, 1)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT tdigest_percentile(latency, 100, 0.99) FROM latencies;
|
||||||
|
tdigest_percentile
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
9904.28342426494
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT tdigest_percentile(latency, 100, ARRAY[0.99, 0.95]) FROM latencies;
|
||||||
|
tdigest_percentile
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
{9904.28342426494,9485.49009399385}
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT tdigest_percentile_of(latency, 100, 9000) FROM latencies;
|
||||||
|
tdigest_percentile_of
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0.903462047211138
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT tdigest_percentile_of(latency, 100, ARRAY[9000, 9500]) FROM latencies;
|
||||||
|
tdigest_percentile_of
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
{0.903462047211138,0.95137481812975}
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE latencies_rollup (a int, tdigest tdigest);
|
||||||
|
SELECT create_distributed_table('latencies_rollup', 'a', colocate_with => 'latencies');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO latencies_rollup
|
||||||
|
SELECT a, tdigest(latency, 100)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest(tdigest)
|
||||||
|
FROM latencies_rollup;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
Output: tdigest(remote_scan.tdigest)
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.tdigest
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
Output: tdigest(tdigest)
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
|
||||||
|
Output: a, tdigest
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest(tdigest)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest(tdigest)
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.a, remote_scan.tdigest
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: a, tdigest(tdigest)
|
||||||
|
Group Key: latencies_rollup.a
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
|
||||||
|
Output: a, tdigest
|
||||||
|
(11 rows)
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile(tdigest, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile(tdigest, 0.99)
|
||||||
|
FROM latencies_rollup;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
Output: tdigest_percentile(remote_scan.tdigest_percentile, '0.99'::double precision)
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.tdigest_percentile
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
Output: tdigest(tdigest)
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
|
||||||
|
Output: a, tdigest
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(tdigest, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile(tdigest, 0.99)
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.a, remote_scan.tdigest_percentile
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: a, tdigest_percentile(tdigest, '0.99'::double precision)
|
||||||
|
Group Key: latencies_rollup.a
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
|
||||||
|
Output: a, tdigest
|
||||||
|
(11 rows)
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile(tdigest, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies_rollup;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
Output: tdigest_percentile(remote_scan.tdigest_percentile, '{0.99,0.95}'::double precision[])
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.tdigest_percentile
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
Output: tdigest(tdigest)
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
|
||||||
|
Output: a, tdigest
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile(tdigest, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.a, remote_scan.tdigest_percentile
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: a, tdigest_percentile(tdigest, '{0.99,0.95}'::double precision[])
|
||||||
|
Group Key: latencies_rollup.a
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
|
||||||
|
Output: a, tdigest
|
||||||
|
(11 rows)
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile_of(tdigest, 9000)
|
||||||
|
FROM latencies_rollup;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
Output: tdigest_percentile_of(remote_scan.tdigest_percentile_of, '9000'::double precision)
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.tdigest_percentile_of
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
Output: tdigest(tdigest)
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
|
||||||
|
Output: a, tdigest
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile_of(tdigest, 9000)
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.a, remote_scan.tdigest_percentile_of
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: a, tdigest_percentile_of(tdigest, '9000'::double precision)
|
||||||
|
Group Key: latencies_rollup.a
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
|
||||||
|
Output: a, tdigest
|
||||||
|
(11 rows)
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile_of(tdigest, ARRAY[9000, 9500])
|
||||||
|
FROM latencies_rollup;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
Output: tdigest_percentile_of(remote_scan.tdigest_percentile_of, '{9000,9500}'::double precision[])
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.tdigest_percentile_of
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Aggregate
|
||||||
|
Output: tdigest(tdigest)
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
|
||||||
|
Output: a, tdigest
|
||||||
|
(12 rows)
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile_of(tdigest, ARRAY[9000, 9500])
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Output: remote_scan.a, remote_scan.tdigest_percentile_of
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate
|
||||||
|
Output: a, tdigest_percentile_of(tdigest, '{9000,9500}'::double precision[])
|
||||||
|
Group Key: latencies_rollup.a
|
||||||
|
-> Seq Scan on tdigest_aggregate_support.latencies_rollup_20070004 latencies_rollup
|
||||||
|
Output: a, tdigest
|
||||||
|
(11 rows)
|
||||||
|
|
||||||
|
-- verifying results - should be stable due to seed while inserting the data, if failure due to data these queries could be removed or check for certain ranges
|
||||||
|
SELECT tdigest(tdigest) FROM latencies_rollup;
|
||||||
|
tdigest
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
flags 0 count 10000 compression 100 centroids 47 (0.287235, 1) (1.025106, 1) (2.058216, 1) (5.335597, 1) (12.707263, 2) (25.302479, 3) (43.435063, 4) (77.987860, 5) (241.681030, 9) (402.696604, 11) (999.675875, 20) (2310.848640, 27) (4374.387978, 37) (9722.896547, 56) (21713.805492, 87) (39735.065966, 112) (87335.860853, 177) (182744.906162, 262) (336766.886786, 338) (661263.339724, 464) (1228663.222377, 623) (2146097.038498, 805) (2854487.701653, 827) (5292830.156590, 1195) (6168185.834602, 1104) (6399734.303813, 966) (5778088.854724, 773) (5213381.984997, 637) (3763042.148296, 431) (3036786.646485, 333) (1948238.134602, 207) (1456568.605821, 152) (999888.715345, 103) (715935.892988, 73) (543464.906535, 55) (327339.982973, 33) (198853.838033, 20) (159362.743852, 16) (79807.827301, 8) (69877.414438, 7) (49937.731689, 5) (29971.046175, 3) (19982.538737, 2) (9991.467422, 1) (9992.337047, 1) (9995.578357, 1) (9999.700339, 1)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT tdigest_percentile(tdigest, 0.99) FROM latencies_rollup;
|
||||||
|
tdigest_percentile
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
9903.76070790358
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT tdigest_percentile(tdigest, ARRAY[0.99, 0.95]) FROM latencies_rollup;
|
||||||
|
tdigest_percentile
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
{9903.76070790358,9492.7106302226}
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT tdigest_percentile_of(tdigest, 9000) FROM latencies_rollup;
|
||||||
|
tdigest_percentile_of
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0.902852659582396
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT tdigest_percentile_of(tdigest, ARRAY[9000, 9500]) FROM latencies_rollup;
|
||||||
|
tdigest_percentile_of
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
{0.902852659582396,0.950865574659141}
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING; -- suppress cascade messages
|
||||||
|
DROP SCHEMA tdigest_aggregate_support CASCADE;
|
|
@ -0,0 +1,234 @@
|
||||||
|
--
|
||||||
|
-- TDIGEST_AGGREGATE_SUPPORT
|
||||||
|
-- test the integration of github.com/tvondra/tdigest aggregates into the citus planner
|
||||||
|
-- for push down parts of the aggregate to use parallelized execution and reduced data
|
||||||
|
-- transfer sizes for aggregates not grouped by the distribution column
|
||||||
|
--
|
||||||
|
SET citus.next_shard_id TO 20070000;
|
||||||
|
CREATE SCHEMA tdigest_aggregate_support;
|
||||||
|
SET search_path TO tdigest_aggregate_support, public;
|
||||||
|
-- create the tdigest extension when installed
|
||||||
|
SELECT CASE WHEN COUNT(*) > 0
|
||||||
|
THEN 'CREATE EXTENSION tdigest WITH SCHEMA public'
|
||||||
|
ELSE 'SELECT false AS tdigest_present' END
|
||||||
|
AS create_cmd FROM pg_available_extensions()
|
||||||
|
WHERE name = 'tdigest'
|
||||||
|
\gset
|
||||||
|
:create_cmd;
|
||||||
|
tdigest_present
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.coordinator_aggregation_strategy TO 'disabled'; -- prevent aggregate execution when the aggregate can't be pushed down
|
||||||
|
CREATE TABLE latencies (a int, b int, latency double precision);
|
||||||
|
SELECT create_distributed_table('latencies', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT setseed(0.42); -- make the random data inserted deterministic
|
||||||
|
setseed
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO latencies
|
||||||
|
SELECT (random()*20)::int AS a,
|
||||||
|
(random()*20)::int AS b,
|
||||||
|
random()*10000.0 AS latency
|
||||||
|
FROM generate_series(1, 10000);
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest(value, compression)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest(latency, 100)
|
||||||
|
FROM latencies;
|
||||||
|
ERROR: function tdigest(double precision, integer) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest(value, compression)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest(latency, 100)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
ERROR: function tdigest(double precision, integer) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest(value, compression)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest(latency, 100)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
ERROR: function tdigest(double precision, integer) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile(latency, 100, 0.99)
|
||||||
|
FROM latencies;
|
||||||
|
ERROR: function tdigest_percentile(double precision, integer, numeric) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile(latency, 100, 0.99)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
ERROR: function tdigest_percentile(double precision, integer, numeric) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile(value, compression, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest_percentile(latency, 100, 0.99)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
ERROR: function tdigest_percentile(double precision, integer, numeric) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies;
|
||||||
|
ERROR: function tdigest_percentile(double precision, integer, numeric[]) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
ERROR: function tdigest_percentile(double precision, integer, numeric[]) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
ERROR: function tdigest_percentile(double precision, integer, numeric[]) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile_of(latency, 100, 9000)
|
||||||
|
FROM latencies;
|
||||||
|
ERROR: function tdigest_percentile_of(double precision, integer, integer) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile_of(latency, 100, 9000)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
ERROR: function tdigest_percentile_of(double precision, integer, integer) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest_percentile_of(latency, 100, 9000)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
ERROR: function tdigest_percentile_of(double precision, integer, integer) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
|
||||||
|
FROM latencies;
|
||||||
|
ERROR: function tdigest_percentile_of(double precision, integer, integer[]) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
ERROR: function tdigest_percentile_of(double precision, integer, integer[]) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
ERROR: function tdigest_percentile_of(double precision, integer, integer[]) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
-- verifying results - should be stable due to seed while inserting the data, if failure due to data these queries could be removed or check for certain ranges
|
||||||
|
SELECT tdigest(latency, 100) FROM latencies;
|
||||||
|
ERROR: function tdigest(double precision, integer) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
SELECT tdigest_percentile(latency, 100, 0.99) FROM latencies;
|
||||||
|
ERROR: function tdigest_percentile(double precision, integer, numeric) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
SELECT tdigest_percentile(latency, 100, ARRAY[0.99, 0.95]) FROM latencies;
|
||||||
|
ERROR: function tdigest_percentile(double precision, integer, numeric[]) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
SELECT tdigest_percentile_of(latency, 100, 9000) FROM latencies;
|
||||||
|
ERROR: function tdigest_percentile_of(double precision, integer, integer) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
SELECT tdigest_percentile_of(latency, 100, ARRAY[9000, 9500]) FROM latencies;
|
||||||
|
ERROR: function tdigest_percentile_of(double precision, integer, integer[]) does not exist
|
||||||
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
||||||
|
CREATE TABLE latencies_rollup (a int, tdigest tdigest);
|
||||||
|
ERROR: type "tdigest" does not exist
|
||||||
|
SELECT create_distributed_table('latencies_rollup', 'a', colocate_with => 'latencies');
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
INSERT INTO latencies_rollup
|
||||||
|
SELECT a, tdigest(latency, 100)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest(tdigest)
|
||||||
|
FROM latencies_rollup;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest(tdigest)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest(tdigest)
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile(tdigest, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile(tdigest, 0.99)
|
||||||
|
FROM latencies_rollup;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(tdigest, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile(tdigest, 0.99)
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile(tdigest, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies_rollup;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile(tdigest, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile_of(tdigest, 9000)
|
||||||
|
FROM latencies_rollup;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile_of(tdigest, 9000)
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile_of(tdigest, ARRAY[9000, 9500])
|
||||||
|
FROM latencies_rollup;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile_of(tdigest, ARRAY[9000, 9500])
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
-- verifying results - should be stable due to seed while inserting the data, if failure due to data these queries could be removed or check for certain ranges
|
||||||
|
SELECT tdigest(tdigest) FROM latencies_rollup;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
SELECT tdigest_percentile(tdigest, 0.99) FROM latencies_rollup;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
SELECT tdigest_percentile(tdigest, ARRAY[0.99, 0.95]) FROM latencies_rollup;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
SELECT tdigest_percentile_of(tdigest, 9000) FROM latencies_rollup;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
SELECT tdigest_percentile_of(tdigest, ARRAY[9000, 9500]) FROM latencies_rollup;
|
||||||
|
ERROR: relation "latencies_rollup" does not exist
|
||||||
|
SET client_min_messages TO WARNING; -- suppress cascade messages
|
||||||
|
DROP SCHEMA tdigest_aggregate_support CASCADE;
|
|
@ -93,7 +93,7 @@ test: multi_subquery_in_where_reference_clause full_join adaptive_executor propa
|
||||||
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
|
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
|
||||||
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql set_role_in_transaction
|
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql set_role_in_transaction
|
||||||
test: multi_reference_table multi_select_for_update relation_access_tracking
|
test: multi_reference_table multi_select_for_update relation_access_tracking
|
||||||
test: custom_aggregate_support aggregate_support
|
test: custom_aggregate_support aggregate_support tdigest_aggregate_support
|
||||||
test: multi_average_expression multi_working_columns multi_having_pushdown having_subquery
|
test: multi_average_expression multi_working_columns multi_having_pushdown having_subquery
|
||||||
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
|
test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown
|
||||||
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having chbenchmark_all_queries expression_reference_join
|
test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having chbenchmark_all_queries expression_reference_join
|
||||||
|
|
|
@ -0,0 +1,195 @@
|
||||||
|
--
|
||||||
|
-- TDIGEST_AGGREGATE_SUPPORT
|
||||||
|
-- test the integration of github.com/tvondra/tdigest aggregates into the citus planner
|
||||||
|
-- for push down parts of the aggregate to use parallelized execution and reduced data
|
||||||
|
-- transfer sizes for aggregates not grouped by the distribution column
|
||||||
|
--
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 20070000;
|
||||||
|
CREATE SCHEMA tdigest_aggregate_support;
|
||||||
|
SET search_path TO tdigest_aggregate_support, public;
|
||||||
|
|
||||||
|
-- create the tdigest extension when installed
|
||||||
|
SELECT CASE WHEN COUNT(*) > 0
|
||||||
|
THEN 'CREATE EXTENSION tdigest WITH SCHEMA public'
|
||||||
|
ELSE 'SELECT false AS tdigest_present' END
|
||||||
|
AS create_cmd FROM pg_available_extensions()
|
||||||
|
WHERE name = 'tdigest'
|
||||||
|
\gset
|
||||||
|
:create_cmd;
|
||||||
|
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.coordinator_aggregation_strategy TO 'disabled'; -- prevent aggregate execution when the aggregate can't be pushed down
|
||||||
|
|
||||||
|
CREATE TABLE latencies (a int, b int, latency double precision);
|
||||||
|
SELECT create_distributed_table('latencies', 'a');
|
||||||
|
SELECT setseed(0.42); -- make the random data inserted deterministic
|
||||||
|
INSERT INTO latencies
|
||||||
|
SELECT (random()*20)::int AS a,
|
||||||
|
(random()*20)::int AS b,
|
||||||
|
random()*10000.0 AS latency
|
||||||
|
FROM generate_series(1, 10000);
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest(value, compression)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest(latency, 100)
|
||||||
|
FROM latencies;
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest(value, compression)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest(latency, 100)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest(value, compression)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest(latency, 100)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile(latency, 100, 0.99)
|
||||||
|
FROM latencies;
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile(latency, 100, 0.99)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile(value, compression, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest_percentile(latency, 100, 0.99)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies;
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest_percentile(latency, 100, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile_of(latency, 100, 9000)
|
||||||
|
FROM latencies;
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile_of(latency, 100, 9000)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest_percentile_of(latency, 100, 9000)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
|
||||||
|
FROM latencies;
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
|
||||||
|
-- explain grouping by non-distribution column is partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT b, tdigest_percentile_of(latency, 100, ARRAY[9000, 9500])
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY b;
|
||||||
|
|
||||||
|
-- verifying results - should be stable due to seed while inserting the data, if failure due to data these queries could be removed or check for certain ranges
|
||||||
|
SELECT tdigest(latency, 100) FROM latencies;
|
||||||
|
SELECT tdigest_percentile(latency, 100, 0.99) FROM latencies;
|
||||||
|
SELECT tdigest_percentile(latency, 100, ARRAY[0.99, 0.95]) FROM latencies;
|
||||||
|
SELECT tdigest_percentile_of(latency, 100, 9000) FROM latencies;
|
||||||
|
SELECT tdigest_percentile_of(latency, 100, ARRAY[9000, 9500]) FROM latencies;
|
||||||
|
|
||||||
|
CREATE TABLE latencies_rollup (a int, tdigest tdigest);
|
||||||
|
SELECT create_distributed_table('latencies_rollup', 'a', colocate_with => 'latencies');
|
||||||
|
|
||||||
|
INSERT INTO latencies_rollup
|
||||||
|
SELECT a, tdigest(latency, 100)
|
||||||
|
FROM latencies
|
||||||
|
GROUP BY a;
|
||||||
|
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest(tdigest)
|
||||||
|
FROM latencies_rollup;
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest(tdigest)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest(tdigest)
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile(tdigest, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile(tdigest, 0.99)
|
||||||
|
FROM latencies_rollup;
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(tdigest, quantile)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile(tdigest, 0.99)
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile(tdigest, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies_rollup;
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile(value, compression, quantiles[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile(tdigest, ARRAY[0.99, 0.95])
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile_of(tdigest, 9000)
|
||||||
|
FROM latencies_rollup;
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_value)
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile_of(tdigest, 9000)
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
|
||||||
|
-- explain no grouping to verify partially pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT tdigest_percentile_of(tdigest, ARRAY[9000, 9500])
|
||||||
|
FROM latencies_rollup;
|
||||||
|
|
||||||
|
-- explain grouping by distribution column is completely pushed down for tdigest_precentile_of(value, compression, hypotetical_values[])
|
||||||
|
EXPLAIN (COSTS OFF, VERBOSE)
|
||||||
|
SELECT a, tdigest_percentile_of(tdigest, ARRAY[9000, 9500])
|
||||||
|
FROM latencies_rollup
|
||||||
|
GROUP BY a;
|
||||||
|
|
||||||
|
-- verifying results - should be stable due to seed while inserting the data, if failure due to data these queries could be removed or check for certain ranges
|
||||||
|
SELECT tdigest(tdigest) FROM latencies_rollup;
|
||||||
|
SELECT tdigest_percentile(tdigest, 0.99) FROM latencies_rollup;
|
||||||
|
SELECT tdigest_percentile(tdigest, ARRAY[0.99, 0.95]) FROM latencies_rollup;
|
||||||
|
SELECT tdigest_percentile_of(tdigest, 9000) FROM latencies_rollup;
|
||||||
|
SELECT tdigest_percentile_of(tdigest, ARRAY[9000, 9500]) FROM latencies_rollup;
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING; -- suppress cascade messages
|
||||||
|
DROP SCHEMA tdigest_aggregate_support CASCADE;
|
Loading…
Reference in New Issue