Add jsonb_agg aggregate support

pull/597/head
Marco Slot 2016-06-02 00:32:05 +02:00
parent 27bf5f2ab2
commit ad1824fea2
9 changed files with 132 additions and 9 deletions

View File

@ -6,7 +6,7 @@ citus_top_builddir = ../../..
MODULE_big = citus
EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3
5.1-1 5.1-2 5.1-3 5.1-4
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -39,6 +39,8 @@ $(EXTENSION)--5.1-2.sql: $(EXTENSION)--5.1-1.sql $(EXTENSION)--5.1-1--5.1-2.sql
cat $^ > $@
$(EXTENSION)--5.1-3.sql: $(EXTENSION)--5.1-2.sql $(EXTENSION)--5.1-2--5.1-3.sql
cat $^ > $@
$(EXTENSION)--5.1-4.sql: $(EXTENSION)--5.1-3.sql $(EXTENSION)--5.1-3--5.1-4.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,6 @@
CREATE AGGREGATE pg_catalog.jsonb_concat_agg(jsonb) (
SFUNC = jsonb_concat,
STYPE = jsonb
);
COMMENT ON AGGREGATE pg_catalog.jsonb_concat_agg(jsonb)
IS 'concatenate JSONB objects';

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '5.1-3'
default_version = '5.1-4'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -1501,17 +1501,21 @@ MasterAggregateExpression(Aggref *originalAggregate,
newMasterExpression = (Expr *) newMasterAggregate;
}
else if (aggregateType == AGGREGATE_ARRAY_AGG)
else if (aggregateType == AGGREGATE_ARRAY_AGG ||
aggregateType == AGGREGATE_JSONB_AGG)
{
/*
* Array aggregates are handled in two steps. First, we compute array_agg()
* on the worker nodes. Then, we gather the arrays on the master and
* compute the array_cat_agg() aggregate on them to get the final array.
* compute the array_cat_agg() aggregate on them to get the final array,
* or use the equivalent functions for jsonb_agg().
*/
Var *column = NULL;
TargetEntry *arrayCatAggArgument = NULL;
Aggref *newMasterAggregate = NULL;
Oid aggregateFunctionId = InvalidOid;
char *masterAggregateName = NULL;
Oid masterArgumentTypeId = InvalidOid;
/* worker aggregate and original aggregate have same return type */
Oid workerReturnType = exprType((Node *) originalAggregate);
@ -1522,9 +1526,19 @@ MasterAggregateExpression(Aggref *originalAggregate,
Assert(!originalAggregate->aggorder);
Assert(!originalAggregate->aggdistinct);
/* array_cat_agg() takes anyarray as input */
aggregateFunctionId = AggregateFunctionOid(ARRAY_CAT_AGGREGATE_NAME,
ANYARRAYOID);
if (aggregateType == AGGREGATE_JSONB_AGG)
{
masterAggregateName = JSONB_CONCAT_AGGREGATE_NAME;
masterArgumentTypeId = JSONBOID;
}
else
{
masterAggregateName = ARRAY_CAT_AGGREGATE_NAME;
masterArgumentTypeId = ANYARRAYOID;
}
aggregateFunctionId = AggregateFunctionOid(masterAggregateName,
masterArgumentTypeId);
/* create argument for the array_cat_agg() aggregate */
column = makeVar(masterTableId, walkerContext->columnId, workerReturnType,

View File

@ -23,6 +23,7 @@
#define DISABLE_LIMIT_APPROXIMATION -1
#define DISABLE_DISTINCT_APPROXIMATION 0.0
#define ARRAY_CAT_AGGREGATE_NAME "array_cat_agg"
#define JSONB_CONCAT_AGGREGATE_NAME "jsonb_concat_agg"
#define WORKER_COLUMN_FORMAT "worker_column_%d"
/* Definitions related to count(distinct) approximations */
@ -54,7 +55,8 @@ typedef enum
AGGREGATE_MAX = 3,
AGGREGATE_SUM = 4,
AGGREGATE_COUNT = 5,
AGGREGATE_ARRAY_AGG = 6
AGGREGATE_ARRAY_AGG = 6,
AGGREGATE_JSONB_AGG = 7
} AggregateType;
@ -96,7 +98,7 @@ typedef enum
*/
static const char *const AggregateNames[] = {
"invalid", "avg", "min", "max", "sum",
"count", "array_agg"
"count", "array_agg", "jsonb_agg"
};

View File

@ -141,3 +141,92 @@ SELECT array_agg(l_orderkey) FROM lineitem WHERE l_quantity < 0;
(1 row)
-- Check jsonb_concat_agg() aggregate which is used to implement jsonb_agg()
SELECT jsonb_concat_agg(i) FROM (
VALUES ('[1,2]'::jsonb), ('[]'::jsonb), ('[{"a":"b"}]'::jsonb)) AS t(i);
jsonb_concat_agg
--------------------
[1, 2, {"a": "b"}]
(1 row)
-- Check jsonb_agg()
SELECT jsonb_pretty(jsonb_agg(row_to_json(lineitem))) FROM lineitem
WHERE l_receiptdate = '1994-06-03' AND l_linenumber = 1;
jsonb_pretty
-----------------------------------------------------------------
[ +
{ +
"l_tax": 0.01, +
"l_comment": "endencies. sl", +
"l_partkey": 92865, +
"l_suppkey": 7884, +
"l_discount": 0.06, +
"l_orderkey": 9156, +
"l_quantity": 5.00, +
"l_shipdate": "1994-05-31", +
"l_shipmode": "REG AIR ", +
"l_commitdate": "1994-03-20", +
"l_linenumber": 1, +
"l_linestatus": "F", +
"l_returnflag": "A", +
"l_receiptdate": "1994-06-03", +
"l_shipinstruct": "NONE ", +
"l_extendedprice": 9289.30 +
}, +
{ +
"l_tax": 0.00, +
"l_comment": "ages sleep according to th", +
"l_partkey": 132438, +
"l_suppkey": 9978, +
"l_discount": 0.01, +
"l_orderkey": 9894, +
"l_quantity": 23.00, +
"l_shipdate": "1994-05-06", +
"l_shipmode": "SHIP ", +
"l_commitdate": "1994-04-26", +
"l_linenumber": 1, +
"l_linestatus": "F", +
"l_returnflag": "A", +
"l_receiptdate": "1994-06-03", +
"l_shipinstruct": "DELIVER IN PERSON ", +
"l_extendedprice": 33819.89 +
}, +
{ +
"l_tax": 0.00, +
"l_comment": " regular decoys wake fluffily. theodoli",+
"l_partkey": 113938, +
"l_suppkey": 3939, +
"l_discount": 0.03, +
"l_orderkey": 14274, +
"l_quantity": 44.00, +
"l_shipdate": "1994-05-05", +
"l_shipmode": "TRUCK ", +
"l_commitdate": "1994-03-12", +
"l_linenumber": 1, +
"l_linestatus": "F", +
"l_returnflag": "A", +
"l_receiptdate": "1994-06-03", +
"l_shipinstruct": "NONE ", +
"l_extendedprice": 85884.92 +
}, +
{ +
"l_tax": 0.07, +
"l_comment": "cajole. silent accounts nod iron", +
"l_partkey": 20029, +
"l_suppkey": 30, +
"l_discount": 0.04, +
"l_orderkey": 14916, +
"l_quantity": 25.00, +
"l_shipdate": "1994-05-07", +
"l_shipmode": "FOB ", +
"l_commitdate": "1994-04-20", +
"l_linenumber": 1, +
"l_linestatus": "F", +
"l_returnflag": "A", +
"l_receiptdate": "1994-06-03", +
"l_shipinstruct": "TAKE BACK RETURN ", +
"l_extendedprice": 23725.50 +
} +
]
(1 row)

View File

@ -17,6 +17,7 @@ ALTER EXTENSION citus UPDATE TO '5.0-2';
ALTER EXTENSION citus UPDATE TO '5.1-1';
ALTER EXTENSION citus UPDATE TO '5.1-2';
ALTER EXTENSION citus UPDATE TO '5.1-3';
ALTER EXTENSION citus UPDATE TO '5.1-4';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -62,3 +62,11 @@ SELECT array_agg(case when l_quantity > 20 then l_quantity else NULL end)
-- Check that we return NULL in case there are no input rows to array_agg()
SELECT array_agg(l_orderkey) FROM lineitem WHERE l_quantity < 0;
-- Check jsonb_concat_agg() aggregate which is used to implement jsonb_agg()
SELECT jsonb_concat_agg(i) FROM (
VALUES ('[1,2]'::jsonb), ('[]'::jsonb), ('[{"a":"b"}]'::jsonb)) AS t(i);
-- Check jsonb_agg()
SELECT jsonb_pretty(jsonb_agg(row_to_json(lineitem))) FROM lineitem
WHERE l_receiptdate = '1994-06-03' AND l_linenumber = 1;

View File

@ -22,6 +22,7 @@ ALTER EXTENSION citus UPDATE TO '5.0-2';
ALTER EXTENSION citus UPDATE TO '5.1-1';
ALTER EXTENSION citus UPDATE TO '5.1-2';
ALTER EXTENSION citus UPDATE TO '5.1-3';
ALTER EXTENSION citus UPDATE TO '5.1-4';
-- drop extension an re-create in newest version
DROP EXTENSION citus;