From 1d953a1c77b4ecaeb919e4e0718731927555ee72 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Tue, 1 Jun 2021 17:03:22 +0200 Subject: [PATCH] implement pushdown of tdigest api's with count --- .../planner/multi_logical_optimizer.c | 118 +++++++++++++++++- .../distributed/planner/tdigest_extension.c | 73 +++++++++++ .../distributed/multi_logical_optimizer.h | 11 +- src/include/distributed/tdigest_extension.h | 7 ++ 4 files changed, 206 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 615c0ddbe..1b534d48c 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -1959,7 +1959,8 @@ MasterAggregateExpression(Aggref *originalAggregate, newMasterExpression = (Expr *) unionAggregate; } else if (aggregateType == AGGREGATE_TDIGEST_COMBINE || - aggregateType == AGGREGATE_TDIGEST_ADD_DOUBLE) + aggregateType == AGGREGATE_TDIGEST_ADD_DOUBLE || + aggregateType == AGGREGATE_TDIGEST_ADD_COUNT_DOUBLE) { /* tdigest of column */ Oid tdigestType = TDigestExtensionTypeOid(); /* tdigest type */ @@ -2044,6 +2045,60 @@ MasterAggregateExpression(Aggref *originalAggregate, newMasterExpression = (Expr *) unionAggregate; } + else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLE || + aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLEARRAY || + aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLE || + aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLEARRAY) + { + /* tdigest of column */ + Oid tdigestType = TDigestExtensionTypeOid(); + Oid unionFunctionId = InvalidOid; + if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLE) + { + unionFunctionId = TDigestExtensionAggTDigestPercentile2(); + } + else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLEARRAY) + { + unionFunctionId = TDigestExtensionAggTDigestPercentile2a(); + } + else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLE) + { + unionFunctionId = TDigestExtensionAggTDigestPercentileOf2(); + } + else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLEARRAY) + { + unionFunctionId = TDigestExtensionAggTDigestPercentileOf2a(); + } + Assert(OidIsValid(unionFunctionId)); + + int32 tdigestReturnTypeMod = exprTypmod((Node *) originalAggregate); + Oid tdigestTypeCollationId = exprCollation((Node *) originalAggregate); + + /* create first argument for tdigest_precentile(tdigest, double) */ + Var *tdigestColumn = makeVar(masterTableId, walkerContext->columnId, tdigestType, + tdigestReturnTypeMod, tdigestTypeCollationId, + columnLevelsUp); + TargetEntry *tdigestTargetEntry = makeTargetEntry((Expr *) tdigestColumn, + argumentId, NULL, false); + walkerContext->columnId++; + + /* construct the master tdigest_precentile(tdigest, double) expression */ + Aggref *unionAggregate = makeNode(Aggref); + unionAggregate->aggfnoid = unionFunctionId; + unionAggregate->aggtype = originalAggregate->aggtype; + unionAggregate->args = list_make2( + tdigestTargetEntry, + list_nth(originalAggregate->args, 3)); + unionAggregate->aggkind = AGGKIND_NORMAL; + unionAggregate->aggfilter = NULL; + unionAggregate->aggtranstype = InvalidOid; + unionAggregate->aggargtypes = list_make2_oid( + tdigestType, + list_nth_oid(originalAggregate->aggargtypes, 2)); + unionAggregate->aggsplit = AGGSPLIT_SIMPLE; + + newMasterExpression = (Expr *) unionAggregate; + } else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLE || aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY || aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE || @@ -3209,6 +3264,42 @@ WorkerAggregateExpressionList(Aggref *originalAggregate, workerAggregateList = lappend(workerAggregateList, newWorkerAggregate); } + else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLE || + aggregateType == AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLEARRAY || + aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLE || + aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLEARRAY) + { + /* + * The original query has an aggregate in the form of either + * - tdigest_percentile(column, count, compression, quantile) + * - tdigest_percentile(column, count, compression, quantile[]) + * - tdigest_percentile_of(column, count, compression, value) + * - tdigest_percentile_of(column, count, compression, value[]) + * + * We are creating the worker part of this query by creating a + * - tdigest(column, count, compression) + * + * One could see we are passing argument 0, 1 and 2 from the original query + * in here. This corresponds with the list_nth calls in the args and aggargstypes + * list construction. The tdigest function and type are read from the catalog. + */ + Aggref *newWorkerAggregate = copyObject(originalAggregate); + newWorkerAggregate->aggfnoid = TDigestExtensionAggTDigest3(); + newWorkerAggregate->aggtype = TDigestExtensionTypeOid(); + newWorkerAggregate->args = list_make3( + list_nth(newWorkerAggregate->args, 0), + list_nth(newWorkerAggregate->args, 1), + list_nth(newWorkerAggregate->args, 2)); + newWorkerAggregate->aggkind = AGGKIND_NORMAL; + newWorkerAggregate->aggtranstype = InvalidOid; + newWorkerAggregate->aggargtypes = list_make3_oid( + list_nth_oid(newWorkerAggregate->aggargtypes, 0), + list_nth_oid(newWorkerAggregate->aggargtypes, 1), + list_nth_oid(newWorkerAggregate->aggargtypes, 2)); + newWorkerAggregate->aggsplit = AGGSPLIT_SIMPLE; + + workerAggregateList = lappend(workerAggregateList, newWorkerAggregate); + } else if (aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLE || aggregateType == AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY || aggregateType == AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE || @@ -3389,6 +3480,21 @@ GetAggregateType(Aggref *aggregateExpression) return AGGREGATE_TDIGEST_ADD_DOUBLE; } + if (aggFunctionId == TDigestExtensionAggTDigest3()) + { + return AGGREGATE_TDIGEST_ADD_COUNT_DOUBLE; + } + + if (aggFunctionId == TDigestExtensionAggTDigestPercentile4()) + { + return AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLE; + } + + if (aggFunctionId == TDigestExtensionAggTDigestPercentile4a()) + { + return AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLEARRAY; + } + if (aggFunctionId == TDigestExtensionAggTDigestPercentile3()) { return AGGREGATE_TDIGEST_PERCENTILE_ADD_DOUBLE; @@ -3409,6 +3515,16 @@ GetAggregateType(Aggref *aggregateExpression) return AGGREGATE_TDIGEST_PERCENTILE_TDIGEST_DOUBLEARRAY; } + if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf4()) + { + return AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLE; + } + + if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf4a()) + { + return AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLEARRAY; + } + if (aggFunctionId == TDigestExtensionAggTDigestPercentileOf3()) { return AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_DOUBLE; diff --git a/src/backend/distributed/planner/tdigest_extension.c b/src/backend/distributed/planner/tdigest_extension.c index 123b170d4..6f44594de 100644 --- a/src/backend/distributed/planner/tdigest_extension.c +++ b/src/backend/distributed/planner/tdigest_extension.c @@ -129,6 +129,19 @@ TDigestExtensionAggTDigest2() } +/* + * TDigestExtensionAggTDigest3 performs a lookup for the Oid of the tdigest aggregate; + * tdigest(value double precision, count bigint, compression int) + * + * If the aggregate is not found InvalidOid is returned. + */ +Oid +TDigestExtensionAggTDigest3() +{ + return LookupTDigestFunction("tdigest", 3, (Oid[]) { FLOAT8OID, INT8OID, INT4OID }); +} + + /* * TDigestExtensionAggTDigestPercentile2 performs a lookup for the Oid of the tdigest * aggregate; @@ -189,6 +202,36 @@ TDigestExtensionAggTDigestPercentile3a(void) } +/* + * TDigestExtensionAggTDigestPercentile4 performs a lookup for the Oid of the tdigest + * aggregate; + * tdigest_percentile(double precision, bigint, int, double precision) + * + * If the aggregate is not found InvalidOid is returned. + */ +Oid +TDigestExtensionAggTDigestPercentile4() +{ + return LookupTDigestFunction("tdigest_percentile", 4, + (Oid[]) { FLOAT8OID, INT8OID, INT4OID, FLOAT8OID }); +} + + +/* + * TDigestExtensionAggTDigestPercentile4a performs a lookup for the Oid of the tdigest + * aggregate; + * tdigest_percentile(double precision, bigint, int, double precision[]) + * + * If the aggregate is not found InvalidOid is returned. + */ +Oid +TDigestExtensionAggTDigestPercentile4a(void) +{ + return LookupTDigestFunction("tdigest_percentile", 4, + (Oid[]) { FLOAT8OID, INT8OID, INT4OID, FLOAT8ARRAYOID }); +} + + /* * TDigestExtensionAggTDigestPercentileOf2 performs a lookup for the Oid of the tdigest * aggregate; @@ -247,3 +290,33 @@ TDigestExtensionAggTDigestPercentileOf3a(void) return LookupTDigestFunction("tdigest_percentile_of", 3, (Oid[]) { FLOAT8OID, INT4OID, FLOAT8ARRAYOID }); } + + +/* + * TDigestExtensionAggTDigestPercentileOf4 performs a lookup for the Oid of the tdigest + * aggregate; + * tdigest_percentile_of(double precision, bigint, int, double precision) + * + * If the aggregate is not found InvalidOid is returned. + */ +Oid +TDigestExtensionAggTDigestPercentileOf4() +{ + return LookupTDigestFunction("tdigest_percentile_of", 4, + (Oid[]) { FLOAT8OID, INT8OID, INT4OID, FLOAT8OID }); +} + + +/* + * TDigestExtensionAggTDigestPercentileOf4a performs a lookup for the Oid of the tdigest + * aggregate; + * tdigest_percentile_of(double precision, bigint, int, double precision[]) + * + * If the aggregate is not found InvalidOid is returned. + */ +Oid +TDigestExtensionAggTDigestPercentileOf4a(void) +{ + return LookupTDigestFunction("tdigest_percentile_of", 4, + (Oid[]) { FLOAT8OID, INT8OID, INT4OID, FLOAT8ARRAYOID }); +} diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index 9e6167959..cacf4d2e5 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -92,9 +92,16 @@ typedef enum AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLE = 29, AGGREGATE_TDIGEST_PERCENTILE_OF_TDIGEST_DOUBLEARRAY = 30, + /* tdigest v1.2.0 API */ + AGGREGATE_TDIGEST_ADD_COUNT_DOUBLE = 31, + AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLE = 32, + AGGREGATE_TDIGEST_PERCENTILE_ADD_COUNT_DOUBLEARRAY = 33, + AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLE = 34, + AGGREGATE_TDIGEST_PERCENTILE_OF_ADD_COUNT_DOUBLEARRAY = 35, + /* AGGREGATE_CUSTOM must come last */ - AGGREGATE_CUSTOM_COMBINE = 31, - AGGREGATE_CUSTOM_ROW_GATHER = 32, + AGGREGATE_CUSTOM_COMBINE = 36, + AGGREGATE_CUSTOM_ROW_GATHER = 37, } AggregateType; diff --git a/src/include/distributed/tdigest_extension.h b/src/include/distributed/tdigest_extension.h index d333369d7..26ce0490e 100644 --- a/src/include/distributed/tdigest_extension.h +++ b/src/include/distributed/tdigest_extension.h @@ -24,4 +24,11 @@ extern Oid TDigestExtensionAggTDigestPercentileOf2a(void); extern Oid TDigestExtensionAggTDigestPercentileOf3(void); extern Oid TDigestExtensionAggTDigestPercentileOf3a(void); +/* v1.2.0 introduced API */ +extern Oid TDigestExtensionAggTDigest3(void); +extern Oid TDigestExtensionAggTDigestPercentile4(void); +extern Oid TDigestExtensionAggTDigestPercentile4a(void); +extern Oid TDigestExtensionAggTDigestPercentileOf4(void); +extern Oid TDigestExtensionAggTDigestPercentileOf4a(void); + #endif /* CITUS_TDIGEST_EXTENSION_H */