implement pushdown of tdigest api's with count

feature/batch-add-tdigest
Nils Dijk 2021-06-01 17:03:22 +02:00
parent 572899b359
commit 1d953a1c77
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
4 changed files with 206 additions and 3 deletions

View File

@ -1959,7 +1959,8 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterExpression = (Expr *) unionAggregate;
}
else if (aggregateType == AGGREGATE_TDIGEST_COMBINE ||
aggregateType == AGGREGATE_TDIGEST_ADD_DOUBLE)
aggregateType == AGGREGATE_TDIGEST_ADD_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_ADD_COUNT_DOUBLE)
{
/* tdigest of column */
Oid tdigestType = TDigestExtensionTypeOid(); /* tdigest type */
@ -2044,6 +2045,60 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterExpression = (Expr *) unionAggregate;
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLEARRAY ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLEARRAY)
{
/* tdigest of column */
Oid tdigestType = TDigestExtensionTypeOid();
Oid unionFunctionId = InvalidOid;
if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLE)
{
unionFunctionId = TDigestExtensionAggTDigestPercentile2();
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLEARRAY)
{
unionFunctionId = TDigestExtensionAggTDigestPercentile2a();
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLE)
{
unionFunctionId = TDigestExtensionAggTDigestPercentileOf2();
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLEARRAY)
{
unionFunctionId = TDigestExtensionAggTDigestPercentileOf2a();
}
Assert(OidIsValid(unionFunctionId));
int32 tdigestReturnTypeMod = exprTypmod((Node *) originalAggregate);
Oid tdigestTypeCollationId = exprCollation((Node *) originalAggregate);
/* create first argument for tdigest_precentile(tdigest, double) */
Var *tdigestColumn = makeVar(masterTableId, walkerContext->columnId, tdigestType,
tdigestReturnTypeMod, tdigestTypeCollationId,
columnLevelsUp);
TargetEntry *tdigestTargetEntry = makeTargetEntry((Expr *) tdigestColumn,
argumentId, NULL, false);
walkerContext->columnId++;
/* construct the master tdigest_precentile(tdigest, double) expression */
Aggref *unionAggregate = makeNode(Aggref);
unionAggregate->aggfnoid = unionFunctionId;
unionAggregate->aggtype = originalAggregate->aggtype;
unionAggregate->args = list_make2(
tdigestTargetEntry,
list_nth(originalAggregate->args, 3));
unionAggregate->aggkind = AGGKIND_NORMAL;
unionAggregate->aggfilter = NULL;
unionAggregate->aggtranstype = InvalidOid;
unionAggregate->aggargtypes = list_make2_oid(
tdigestType,
list_nth_oid(originalAggregate->aggargtypes, 2));
unionAggregate->aggsplit = AGGSPLIT_SIMPLE;
newMasterExpression = (Expr *) unionAggregate;
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE ||
@ -3209,6 +3264,42 @@ WorkerAggregateExpressionList(Aggref *originalAggregate,
workerAggregateList = lappend(workerAggregateList, newWorkerAggregate);
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLEARRAY ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLEARRAY)
{
/*
* The original query has an aggregate in the form of either
* - tdigest_percentile(column, count, compression, quantile)
* - tdigest_percentile(column, count, compression, quantile[])
* - tdigest_percentile_of(column, count, compression, value)
* - tdigest_percentile_of(column, count, compression, value[])
*
* We are creating the worker part of this query by creating a
* - tdigest(column, count, compression)
*
* One could see we are passing argument 0, 1 and 2 from the original query
* in here. This corresponds with the list_nth calls in the args and aggargstypes
* list construction. The tdigest function and type are read from the catalog.
*/
Aggref *newWorkerAggregate = copyObject(originalAggregate);
newWorkerAggregate->aggfnoid = TDigestExtensionAggTDigest3();
newWorkerAggregate->aggtype = TDigestExtensionTypeOid();
newWorkerAggregate->args = list_make3(
list_nth(newWorkerAggregate->args, 0),
list_nth(newWorkerAggregate->args, 1),
list_nth(newWorkerAggregate->args, 2));
newWorkerAggregate->aggkind = AGGKIND_NORMAL;
newWorkerAggregate->aggtranstype = InvalidOid;
newWorkerAggregate->aggargtypes = list_make3_oid(
list_nth_oid(newWorkerAggregate->aggargtypes, 0),
list_nth_oid(newWorkerAggregate->aggargtypes, 1),
list_nth_oid(newWorkerAggregate->aggargtypes, 2));
newWorkerAggregate->aggsplit = AGGSPLIT_SIMPLE;
workerAggregateList = lappend(workerAggregateList, newWorkerAggregate);
}
else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLE ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY ||
aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE ||
@ -3389,6 +3480,21 @@ GetAggregateType(Aggref *aggregateExpression)
return AGGREGATE_TDIGEST_ADD_DOUBLE;
}
if (aggFunctionId == TDigestExtensionAggTDigest3())
{
return AGGREGATE_TDIGEST_ADD_COUNT_DOUBLE;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentile4())
{
return AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLE;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentile4a())
{
return AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLEARRAY;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentile3())
{
return AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLE;
@ -3409,6 +3515,16 @@ GetAggregateType(Aggref *aggregateExpression)
return AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf4())
{
return AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLE;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf4a())
{
return AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLEARRAY;
}
if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf3())
{
return AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLE;

View File

@ -129,6 +129,19 @@ TDigestExtensionAggTDigest2()
}
/*
* TDigestExtensionAggTDigest3 performs a lookup for the Oid of the tdigest aggregate;
* tdigest(value double precision, count bigint, compression int)
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigest3()
{
return LookupTDigestFunction("tdigest", 3, (Oid[]) { FLOAT8OID, INT8OID, INT4OID });
}
/*
* TDigestExtensionAggTDigestPercentile2 performs a lookup for the Oid of the tdigest
* aggregate;
@ -189,6 +202,36 @@ TDigestExtensionAggTDigestPercentile3a(void)
}
/*
* TDigestExtensionAggTDigestPercentile4 performs a lookup for the Oid of the tdigest
* aggregate;
* tdigest_percentile(double precision, bigint, int, double precision)
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigestPercentile4()
{
return LookupTDigestFunction("tdigest_percentile", 4,
(Oid[]) { FLOAT8OID, INT8OID, INT4OID, FLOAT8OID });
}
/*
* TDigestExtensionAggTDigestPercentile4a performs a lookup for the Oid of the tdigest
* aggregate;
* tdigest_percentile(double precision, bigint, int, double precision[])
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigestPercentile4a(void)
{
return LookupTDigestFunction("tdigest_percentile", 4,
(Oid[]) { FLOAT8OID, INT8OID, INT4OID, FLOAT8ARRAYOID });
}
/*
* TDigestExtensionAggTDigestPercentileOf2 performs a lookup for the Oid of the tdigest
* aggregate;
@ -247,3 +290,33 @@ TDigestExtensionAggTDigestPercentileOf3a(void)
return LookupTDigestFunction("tdigest_percentile_of", 3,
(Oid[]) { FLOAT8OID, INT4OID, FLOAT8ARRAYOID });
}
/*
* TDigestExtensionAggTDigestPercentileOf4 performs a lookup for the Oid of the tdigest
* aggregate;
* tdigest_percentile_of(double precision, bigint, int, double precision)
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigestPercentileOf4()
{
return LookupTDigestFunction("tdigest_percentile_of", 4,
(Oid[]) { FLOAT8OID, INT8OID, INT4OID, FLOAT8OID });
}
/*
* TDigestExtensionAggTDigestPercentileOf4a performs a lookup for the Oid of the tdigest
* aggregate;
* tdigest_percentile_of(double precision, bigint, int, double precision[])
*
* If the aggregate is not found InvalidOid is returned.
*/
Oid
TDigestExtensionAggTDigestPercentileOf4a(void)
{
return LookupTDigestFunction("tdigest_percentile_of", 4,
(Oid[]) { FLOAT8OID, INT8OID, INT4OID, FLOAT8ARRAYOID });
}

View File

@ -92,9 +92,16 @@ typedef enum
AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE = 29,
AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLEARRAY = 30,
/* tdigest v1.2.0 API */
AGGREGATE_TDIGEST_ADD_COUNT_DOUBLE = 31,
AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLE = 32,
AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLEARRAY = 33,
AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLE = 34,
AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLEARRAY = 35,
/* AGGREGATE_CUSTOM must come last */
AGGREGATE_CUSTOM_COMBINE = 31,
AGGREGATE_CUSTOM_ROW_GATHER = 32,
AGGREGATE_CUSTOM_COMBINE = 36,
AGGREGATE_CUSTOM_ROW_GATHER = 37,
} AggregateType;

View File

@ -24,4 +24,11 @@ extern Oid TDigestExtensionAggTDigestPercentileOf2a(void);
extern Oid TDigestExtensionAggTDigestPercentileOf3(void);
extern Oid TDigestExtensionAggTDigestPercentileOf3a(void);
/* v1.2.0 introduced API */
extern Oid TDigestExtensionAggTDigest3(void);
extern Oid TDigestExtensionAggTDigestPercentile4(void);
extern Oid TDigestExtensionAggTDigestPercentile4a(void);
extern Oid TDigestExtensionAggTDigestPercentileOf4(void);
extern Oid TDigestExtensionAggTDigestPercentileOf4a(void);
#endif /* CITUS_TDIGEST_EXTENSION_H */